diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java index 13af3b8e6c7..a8f7c3613b9 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java @@ -42,6 +42,18 @@ public interface MetricsRegionServerSourceFactory { */ MetricsRegionSource createRegion(MetricsRegionWrapper wrapper); + /** + * Create a MetricsUserSource from a user + * @return A metrics user source + */ + MetricsUserSource createUser(String shortUserName); + + /** + * Return the singleton instance for MetricsUserAggregateSource + * @return A metrics user aggregate source + */ + MetricsUserAggregateSource getUserAggregate(); + /** * Create a MetricsTableSource from a MetricsTableWrapper. * diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java new file mode 100644 index 00000000000..f5e0c288d68 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** +* This interface will be implemented by a MetricsSource that will export metrics from +* multiple users into the hadoop metrics system. +*/ +public interface MetricsUserAggregateSource extends BaseSource { + + /** + * The name of the metrics + */ + static final String METRICS_NAME = "Users"; + + /** + * The name of the metrics context that metrics will be under. + */ + static final String METRICS_CONTEXT = "regionserver"; + + /** + * Description + */ + static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver"; + + /** + * The name of the metrics context that metrics will be under in jmx + */ + static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + static final String NUM_USERS = "numUsers"; + static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system"; + + /** + * Returns a MetricsUserSource if already exists, or creates and registers one for this user + * @param user the user name + * @return a metrics user source + */ + MetricsUserSource getOrCreateMetricsUser(String user); + + void deregister(MetricsUserSource toRemove); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java new file mode 100644 index 00000000000..f60574f2ea4 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +public interface MetricsUserSource extends Comparable { + + String getUser(); + + void register(); + + void deregister(); + + void updatePut(long t); + + void updateDelete(long t); + + void updateGet(long t); + + void updateIncrement(long t); + + void updateAppend(long t); + + void updateReplay(long t); + + void updateScanTime(long t); +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java index b13268ac78e..49e3736c9ca 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java @@ -31,16 +31,26 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer public static enum FactoryStorage { INSTANCE; private Object aggLock = new Object(); - private MetricsRegionAggregateSourceImpl aggImpl; + private MetricsRegionAggregateSourceImpl regionAggImpl; + private MetricsUserAggregateSourceImpl userAggImpl; private MetricsTableAggregateSourceImpl tblAggImpl; } - private synchronized MetricsRegionAggregateSourceImpl getAggregate() { + private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() { synchronized (FactoryStorage.INSTANCE.aggLock) { - if (FactoryStorage.INSTANCE.aggImpl == null) { - FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl(); + if (FactoryStorage.INSTANCE.regionAggImpl == null) { + FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl(); } - return FactoryStorage.INSTANCE.aggImpl; + return FactoryStorage.INSTANCE.regionAggImpl; + } + } + + public synchronized MetricsUserAggregateSourceImpl getUserAggregate() { + synchronized (FactoryStorage.INSTANCE.aggLock) { + if (FactoryStorage.INSTANCE.userAggImpl == null) { + FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl(); + } + return FactoryStorage.INSTANCE.userAggImpl; } } @@ -61,7 +71,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer @Override public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) { - return new MetricsRegionSourceImpl(wrapper, getAggregate()); + return new MetricsRegionSourceImpl(wrapper, getRegionAggregate()); } @Override @@ -72,4 +82,10 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer public MetricsIOSource createIO(MetricsIOWrapper wrapper) { return new MetricsIOSourceImpl(wrapper); } + + @Override + public org.apache.hadoop.hbase.regionserver.MetricsUserSource createUser(String shortUserName) { + return new org.apache.hadoop.hbase.regionserver.MetricsUserSourceImpl(shortUserName, + getUserAggregate()); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java new file mode 100644 index 00000000000..c8dac06a3f2 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class MetricsUserAggregateSourceImpl extends BaseSourceImpl + implements MetricsUserAggregateSource { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsUserAggregateSourceImpl.class); + + private final ConcurrentHashMap userSources = + new ConcurrentHashMap(); + + public MetricsUserAggregateSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + public MetricsUserAggregateSourceImpl(String metricsName, + String metricsDescription, + String metricsContext, + String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + } + + @Override + public MetricsUserSource getOrCreateMetricsUser(String user) { + MetricsUserSource source = userSources.get(user); + if (source != null) { + return source; + } + source = new MetricsUserSourceImpl(user, this); + MetricsUserSource prev = userSources.putIfAbsent(user, source); + + if (prev != null) { + return prev; + } else { + // register the new metrics now + register(source); + } + return source; + } + + public void register(MetricsUserSource source) { + synchronized (this) { + source.register(); + } + } + + @Override + public void deregister(MetricsUserSource toRemove) { + try { + synchronized (this) { + MetricsUserSource source = userSources.remove(toRemove.getUser()); + if (source != null) { + source.deregister(); + } + } + } catch (Exception e) { + // Ignored. If this errors out it means that someone is double + // closing the user source and the user metrics is already nulled out. + LOG.info("Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e); + } + } + + @VisibleForTesting + public ConcurrentHashMap getUserSources() { + return userSources; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder mrb = collector.addRecord(metricsName); + + if (userSources != null) { + for (MetricsUserSource userMetricSource : userSources.values()) { + if (userMetricSource instanceof MetricsUserSourceImpl) { + ((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all); + } + } + mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size()); + metricsRegistry.snapshot(mrb, all); + } + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java new file mode 100644 index 00000000000..5e1ce313a5b --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class MetricsUserSourceImpl implements MetricsUserSource { + private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class); + + private final String userNamePrefix; + + private final String user; + + private final String userGetKey; + private final String userScanTimeKey; + private final String userPutKey; + private final String userDeleteKey; + private final String userIncrementKey; + private final String userAppendKey; + private final String userReplayKey; + + private MetricHistogram getHisto; + private MetricHistogram scanTimeHisto; + private MetricHistogram putHisto; + private MetricHistogram deleteHisto; + private MetricHistogram incrementHisto; + private MetricHistogram appendHisto; + private MetricHistogram replayHisto; + + private final int hashCode; + + private AtomicBoolean closed = new AtomicBoolean(false); + private final MetricsUserAggregateSourceImpl agg; + private final DynamicMetricsRegistry registry; + + public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new MetricsUserSourceImpl for user " + user); + } + + this.user = user; + this.agg = agg; + this.registry = agg.getMetricsRegistry(); + + this.userNamePrefix = "user_" + user + "_metric_"; + + hashCode = userNamePrefix.hashCode(); + + userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY; + userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY; + userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY; + userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY; + userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; + userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; + userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; + + agg.register(this); + } + + @Override + public void register() { + synchronized (this) { + getHisto = registry.newTimeHistogram(userGetKey); + scanTimeHisto = registry.newTimeHistogram(userScanTimeKey); + putHisto = registry.newTimeHistogram(userPutKey); + deleteHisto = registry.newTimeHistogram(userDeleteKey); + incrementHisto = registry.newTimeHistogram(userIncrementKey); + appendHisto = registry.newTimeHistogram(userAppendKey); + replayHisto = registry.newTimeHistogram(userReplayKey); + } + } + + @Override + public void deregister() { + boolean wasClosed = closed.getAndSet(true); + + // Has someone else already closed this for us? + if (wasClosed) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing user Metrics for user: " + user); + } + + synchronized (this) { + registry.removeMetric(userGetKey); + registry.removeMetric(userScanTimeKey); + registry.removeMetric(userPutKey); + registry.removeMetric(userDeleteKey); + registry.removeMetric(userIncrementKey); + registry.removeMetric(userAppendKey); + registry.removeMetric(userReplayKey); + } + } + + @Override + public String getUser() { + return user; + } + + @Override + public int compareTo(MetricsUserSource source) { + if (source == null) { + return -1; + } + if (!(source instanceof MetricsUserSourceImpl)) { + return -1; + } + + MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source; + + return Long.compare(hashCode, impl.hashCode); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0); + } + + void snapshot(MetricsRecordBuilder mrb, boolean ignored) { + // If there is a close that started be double extra sure + // that we're not getting any locks and not putting data + // into the metrics that should be removed. So early out + // before even getting the lock. + if (closed.get()) { + return; + } + + // Grab the read + // This ensures that removes of the metrics + // can't happen while we are putting them back in. + synchronized (this) { + + // It's possible that a close happened between checking + // the closed variable and getting the lock. + if (closed.get()) { + return; + } + } + } + + @Override + public void updatePut(long t) { + putHisto.add(t); + } + + @Override + public void updateDelete(long t) { + deleteHisto.add(t); + } + + @Override + public void updateGet(long t) { + getHisto.add(t); + } + + @Override + public void updateIncrement(long t) { + incrementHisto.add(t); + } + + @Override + public void updateAppend(long t) { + appendHisto.add(t); + } + + @Override + public void updateReplay(long t) { + replayHisto.add(t); + } + + @Override + public void updateScanTime(long t) { + scanTimeHisto.add(t); + } +} diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java new file mode 100644 index 00000000000..3155e66fc98 --- /dev/null +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.testclassification.MetricsTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MetricsTests.class, SmallTests.class}) +public class TestMetricsUserSourceImpl { + + @Test + public void testCompareToHashCodeEquals() throws Exception { + MetricsRegionServerSourceFactory fact + = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class); + + MetricsUserSource one = fact.createUser("ONE"); + MetricsUserSource oneClone = fact.createUser("ONE"); + MetricsUserSource two = fact.createUser("TWO"); + + assertEquals(0, one.compareTo(oneClone)); + assertEquals(one.hashCode(), oneClone.hashCode()); + assertNotEquals(one, two); + + assertTrue(one.compareTo(two) != 0); + assertTrue(two.compareTo(one) != 0); + assertTrue(two.compareTo(one) != one.compareTo(two)); + assertTrue(two.compareTo(two) == 0); + } + + + @Test (expected = RuntimeException.class) + public void testNoGetRegionServerMetricsSourceImpl() throws Exception { + // This should throw an exception because MetricsUserSourceImpl should only + // be created by a factory. + CompatibilitySingletonFactory.getInstance(MetricsUserSource.class); + } + + @Test + public void testGetUser() { + MetricsRegionServerSourceFactory fact + = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class); + + MetricsUserSource one = fact.createUser("ONE"); + assertEquals("ONE", one.getUser()); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index a5a880bc964..1d745adb0ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.util.LossyCounting; public class MetaTableMetrics extends BaseRegionObserver { private MetricRegistry registry; - private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; + private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; private Set metrics = new HashSet(); @@ -251,14 +251,15 @@ public class MetaTableMetrics extends BaseRegionObserver { .equals(TableName.META_TABLE_NAME)) { RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){ - @Override public void sweep(String key) { - registry.remove(key); - metrics.remove(key); - } + LossyCounting.LossyCountingListener listener = + new LossyCounting.LossyCountingListener() { + @Override public void sweep(String key) { + registry.remove(key); + metrics.remove(key); + } }; - clientMetricsLossyCounting = new LossyCounting(listener); - regionMetricsLossyCounting = new LossyCounting(listener); + clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener); + regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener); // only be active mode when this region holds meta table. active = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index e51d22986cf..44147e7e475 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -44,9 +44,10 @@ public class MetricsRegionServer { "hbase.regionserver.enable.table.latencies"; public static final boolean RS_ENABLE_TABLE_METRICS_DEFAULT = true; - private MetricsRegionServerSource serverSource; - private MetricsRegionServerWrapper regionServerWrapper; - private RegionServerTableMetrics tableMetrics; + private final MetricsRegionServerSource serverSource; + private final MetricsRegionServerWrapper regionServerWrapper; + private final RegionServerTableMetrics tableMetrics; + private final MetricsUserAggregate userAggregate; private MetricRegistry metricRegistry; private Timer bulkLoadTimer; @@ -56,8 +57,8 @@ public class MetricsRegionServer { public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf) { this(regionServerWrapper, CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) - .createServer(regionServerWrapper), - createTableMetrics(conf)); + .createServer(regionServerWrapper), createTableMetrics(conf), + MetricsUserAggregateFactory.getMetricsUserAggregate(conf)); // Create hbase-metrics module based metrics. The registry should already be registered by the // MetricsRegionServerSource @@ -70,11 +71,12 @@ public class MetricsRegionServer { } MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, - MetricsRegionServerSource serverSource, - RegionServerTableMetrics tableMetrics) { + MetricsRegionServerSource serverSource, RegionServerTableMetrics tableMetrics, + MetricsUserAggregate userAggregate) { this.regionServerWrapper = regionServerWrapper; this.serverSource = serverSource; this.tableMetrics = tableMetrics; + this.userAggregate = userAggregate; } /** @@ -92,6 +94,11 @@ public class MetricsRegionServer { return serverSource; } + @VisibleForTesting + public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() { + return userAggregate; + } + public MetricsRegionServerWrapper getRegionServerWrapper() { return regionServerWrapper; } @@ -111,6 +118,7 @@ public class MetricsRegionServer { tableMetrics.updatePut(tn, t); } serverSource.updatePut(t); + userAggregate.updatePut(t); } public void updateDelete(TableName tn, long t) { @@ -118,6 +126,7 @@ public class MetricsRegionServer { tableMetrics.updateDelete(tn, t); } serverSource.updateDelete(t); + userAggregate.updateDelete(t); } public void updateDeleteBatch(TableName tn, long t) { @@ -146,6 +155,7 @@ public class MetricsRegionServer { serverSource.incrSlowGet(); } serverSource.updateGet(t); + userAggregate.updateGet(t); } public void updateIncrement(TableName tn, long t) { @@ -156,6 +166,7 @@ public class MetricsRegionServer { serverSource.incrSlowIncrement(); } serverSource.updateIncrement(t); + userAggregate.updateIncrement(t); } public void updateAppend(TableName tn, long t) { @@ -166,10 +177,12 @@ public class MetricsRegionServer { serverSource.incrSlowAppend(); } serverSource.updateAppend(t); + userAggregate.updateAppend(t); } public void updateReplay(long t){ serverSource.updateReplay(t); + userAggregate.updateReplay(t); } public void updateScanSize(TableName tn, long scanSize){ @@ -184,6 +197,7 @@ public class MetricsRegionServer { tableMetrics.updateScanTime(tn, t); } serverSource.updateScanTime(t); + userAggregate.updateScanTime(t); } public void updateSplitTime(long t) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java new file mode 100644 index 00000000000..983f05db55c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsUserAggregate { + + void updatePut(long t); + + void updateDelete(long t); + + void updateGet(long t); + + void updateIncrement(long t); + + void updateAppend(long t); + + void updateReplay(long t); + + void updateScanTime(long t); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java new file mode 100644 index 00000000000..4d3437dbe08 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsUserAggregateFactory { + private MetricsUserAggregateFactory() { + + } + public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled"; + public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true; + + public static MetricsUserAggregate getMetricsUserAggregate(Configuration conf) { + if (conf.getBoolean(METRIC_USER_ENABLED_CONF, DEFAULT_METRIC_USER_ENABLED_CONF)) { + return new MetricsUserAggregateImpl(conf); + } else { + //NoOpMetricUserAggregate + return new MetricsUserAggregate() { + @Override public void updatePut(long t) { + + } + + @Override public void updateDelete(long t) { + + } + + @Override public void updateGet(long t) { + + } + + @Override public void updateIncrement(long t) { + + } + + @Override public void updateAppend(long t) { + + } + + @Override public void updateReplay(long t) { + + } + + @Override public void updateScanTime(long t) { + + } + }; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java new file mode 100644 index 00000000000..731e642a1ce --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.LossyCounting; + +@InterfaceAudience.Private +public class MetricsUserAggregateImpl implements MetricsUserAggregate{ + + /** Provider for mapping principal names to Users */ + private final UserProvider userProvider; + + private final MetricsUserAggregateSource source; + private final LossyCounting userMetricLossyCounting; + + public MetricsUserAggregateImpl(Configuration conf) { + source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) + .getUserAggregate(); + userMetricLossyCounting = new LossyCounting("userMetrics", + new LossyCounting.LossyCountingListener() { + @Override + public void sweep(MetricsUserSource key) { + source.deregister(key); + } + }); + this.userProvider = UserProvider.instantiate(conf); + } + + /** + * Returns the active user to which authorization checks should be applied. + * If we are in the context of an RPC call, the remote user is used, + * otherwise the currently logged in user is used. + */ + private String getActiveUser() { + User user = RpcServer.getRequestUser(); + if (user == null) { + try { + user = userProvider.getCurrent(); + } catch (IOException e) { } + } + return user != null ? user.getShortName() : null; + } + + @VisibleForTesting + MetricsUserAggregateSource getSource() { + return source; + } + + @Override + public void updatePut(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updatePut(t); + } + } + + @Override + public void updateDelete(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateDelete(t); + } + } + + @Override + public void updateGet(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateGet(t); + } + } + + @Override + public void updateIncrement(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateIncrement(t); + } + } + + @Override + public void updateAppend(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateAppend(t); + } + } + + @Override + public void updateReplay(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateReplay(t); + } + } + + @Override + public void updateScanTime(long t) { + String user = getActiveUser(); + if (user != null) { + getOrCreateMetricsUser(user).updateScanTime(t); + } + } + + private MetricsUserSource getOrCreateMetricsUser(String user) { + MetricsUserSource userSource = source.getOrCreateMetricsUser(user); + userMetricLossyCounting.add(userSource); + return userSource; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index 255f7206de3..95e974933b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -19,13 +19,20 @@ package org.apache.hadoop.hbase.util; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * LossyCounting utility, bounded data structure that maintains approximate high frequency @@ -39,35 +46,41 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private -public class LossyCounting { +public class LossyCounting { + private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); + private final ExecutorService executor; private long bucketSize; private int currentTerm; - private Map data; + private Map data; private long totalDataCount; + private final String name; private LossyCountingListener listener; + private static AtomicReference fut = new AtomicReference<>(null); - public interface LossyCountingListener { - void sweep(String key); + public interface LossyCountingListener { + void sweep(T key); } - public LossyCounting(double errorRate, LossyCountingListener listener) { + public LossyCounting(double errorRate, String name, LossyCountingListener listener) { if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); } + this.name = name; this.bucketSize = (long) Math.ceil(1 / errorRate); this.currentTerm = 1; this.totalDataCount = 0; this.data = new ConcurrentHashMap<>(); this.listener = listener; calculateCurrentTerm(); + executor = Executors.newSingleThreadExecutor(); } - public LossyCounting(LossyCountingListener listener) { + public LossyCounting(String name, LossyCountingListener listener) { this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), - listener); + name, listener); } - private void addByOne(String key) { + private void addByOne(T key) { //If entry exists, we update the entry by incrementing its frequency by one. Otherwise, //we create a new entry starting with currentTerm so that it will not be pruned immediately Integer i = data.get(key); @@ -80,22 +93,28 @@ public class LossyCounting { calculateCurrentTerm(); } - public void add(String key) { + public void add(T key) { addByOne(key); if(totalDataCount % bucketSize == 0) { //sweep the entries at bucket boundaries - sweep(); + //run Sweep + Future future = fut.get(); + if (future != null && !future.isDone()){ + return; + } + future = executor.submit(new SweepRunnable()); + fut.set(future); } } /** * sweep low frequency data - * @return Names of elements got swept */ - private void sweep() { - for(Map.Entry entry : data.entrySet()) { + @VisibleForTesting + public void sweep() { + for(Map.Entry entry : data.entrySet()) { if(entry.getValue() < currentTerm) { - String metric = entry.getKey(); + T metric = entry.getKey(); data.remove(metric); if (listener != null) { listener.sweep(metric); @@ -119,16 +138,33 @@ public class LossyCounting { return data.size(); } - public boolean contains(String key) { + public boolean contains(T key) { return data.containsKey(key); } - public Set getElements(){ + public Set getElements(){ return data.keySet(); } public long getCurrentTerm() { return currentTerm; } + + class SweepRunnable implements Runnable { + @Override public void run() { + if (LOG.isTraceEnabled()) { + LOG.trace("Starting sweep of lossyCounting-" + name); + } + try { + sweep(); + } catch (Exception exception) { + LOG.debug("Error while sweeping lossyCounting-" + name, exception); + } + } + } + + @VisibleForTesting public Future getSweepFuture() { + return fut.get(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java index 32ffbf815d4..5bd0e9d04ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertTrue; import java.io.IOException; + import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.TableName; @@ -32,8 +33,8 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestMetricsTableLatencies { - public static MetricsAssertHelper HELPER = - CompatibilityFactory.getInstance(MetricsAssertHelper.class); + private static MetricsAssertHelper HELPER = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); @Test public void testTableWrapperAggregateMetrics() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java new file mode 100644 index 00000000000..8b618abf349 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.security.PrivilegedAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMetricsUserAggregate { + + public static MetricsAssertHelper HELPER = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + + private MetricsRegionServerWrapperStub wrapper; + private MetricsRegionServer rsm; + private MetricsUserAggregateImpl userAgg; + private TableName tableName = TableName.valueOf("testUserAggregateMetrics"); + + @BeforeClass + public static void classSetUp() { + HELPER.init(); + } + + @Before + public void setUp() { + wrapper = new MetricsRegionServerWrapperStub(); + Configuration conf = HBaseConfiguration.create(); + rsm = new MetricsRegionServer(wrapper, conf); + userAgg = (MetricsUserAggregateImpl)rsm.getMetricsUserAggregate(); + } + + private void doOperations() { + for (int i=0; i < 10; i ++) { + rsm.updateGet(tableName,10); + } + for (int i=0; i < 11; i ++) { + rsm.updateScanTime(tableName,11); + } + for (int i=0; i < 12; i ++) { + rsm.updatePut(tableName,12); + } + for (int i=0; i < 13; i ++) { + rsm.updateDelete(tableName,13); + } + for (int i=0; i < 14; i ++) { + rsm.updateIncrement(tableName,14); + } + for (int i=0; i < 15; i ++) { + rsm.updateAppend(tableName,15); + } + for (int i=0; i < 16; i ++) { + rsm.updateReplay(16); + } + } + + @Test + public void testPerUserOperations() { + Configuration conf = HBaseConfiguration.create(); + User userFoo = User.createUserForTesting(conf, "FOO", new String[0]); + User userBar = User.createUserForTesting(conf, "BAR", new String[0]); + + userFoo.getUGI().doAs(new PrivilegedAction() { + @Override + public Void run() { + doOperations(); + return null; + } + }); + + userBar.getUGI().doAs(new PrivilegedAction() { + @Override + public Void run() { + doOperations(); + return null; + } + }); + + HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource()); + HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource()); + HELPER.assertCounter("userfoometricputnumops", 12, userAgg.getSource()); + HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource()); + HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource()); + HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource()); + HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource()); + + HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource()); + HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource()); + HELPER.assertCounter("userbarmetricputnumops", 12, userAgg.getSource()); + HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource()); + HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource()); + HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource()); + HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource()); + } + + @Test public void testLossyCountingOfUserMetrics() { + Configuration conf = HBaseConfiguration.create(); + int noOfUsers = 10000; + for (int i = 1; i <= noOfUsers; i++) { + User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI() + .doAs(new PrivilegedAction() { + @Override public Void run() { + rsm.updateGet(tableName, 10); + return null; + } + }); + } + assertTrue( + ((MetricsUserAggregateSourceImpl) userAgg.getSource()).getUserSources().size() <= (noOfUsers + / 10)); + for (int i = 1; i <= noOfUsers / 10; i++) { + assertFalse( + HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource())); + } + HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index 548d31adaa0..4a634e12169 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; - import org.junit.Test; import org.junit.experimental.categories.Category; @@ -32,15 +31,15 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01, null); + LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting(null); + LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01, null); + LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null); for(int i = 0; i < 100; i++){ String key = "" + i; lossyCounting.add(key); @@ -54,30 +53,46 @@ public class TestLossyCounting { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01, null); + LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null); for(int i = 0; i < 400; i++){ String key = "" + i; lossyCounting.add(key); } assertEquals(4L, lossyCounting.getCurrentTerm()); - //if total rows added are proportional to bucket size + waitForSweep(lossyCounting); + + //Do last one sweep as some sweep will be skipped when first one was running + lossyCounting.sweep(); assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize()); } + private void waitForSweep(LossyCounting lossyCounting) { + //wait for sweep thread to complete + int retry = 0; + while (!lossyCounting.getSweepFuture().isDone() && retry < 10) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + retry++; + } + } + @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1, null); + LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null); for(int i = 0; i < 10; i++){ String key = "" + i; lossyCounting.add(key); } + waitForSweep(lossyCounting); assertEquals(10L, lossyCounting.getDataSize()); for(int i = 0; i < 10; i++){ String key = "1"; lossyCounting.add(key); } + waitForSweep(lossyCounting); assertEquals(1L, lossyCounting.getDataSize()); } - } \ No newline at end of file