HBASE-15518 Add Per-Table metrics back (Alicia Ying Shu)
This commit is contained in:
parent
6dd938c20b
commit
18d70bc680
|
@ -34,8 +34,24 @@ public interface MetricsRegionServerSourceFactory {
|
|||
/**
|
||||
* Create a MetricsRegionSource from a MetricsRegionWrapper.
|
||||
*
|
||||
* @param wrapper
|
||||
* @param wrapper The wrapped region
|
||||
* @return A metrics region source
|
||||
*/
|
||||
MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
|
||||
|
||||
/**
|
||||
* Create a MetricsTableSource from a MetricsTableWrapper.
|
||||
*
|
||||
* @param table The table name
|
||||
* @param wrapper The wrapped table aggregate
|
||||
* @return A metrics table source
|
||||
*/
|
||||
MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper);
|
||||
|
||||
/**
|
||||
* Get a MetricsTableAggregateSource
|
||||
*
|
||||
* @return A metrics table aggregate source
|
||||
*/
|
||||
MetricsTableAggregateSource getTableAggregate();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
|
||||
/**
|
||||
* This interface will be implemented by a MetricsSource that will export metrics from
|
||||
* multiple regions of a table into the hadoop metrics system.
|
||||
*/
|
||||
public interface MetricsTableAggregateSource extends BaseSource {
|
||||
|
||||
/**
|
||||
* The name of the metrics
|
||||
*/
|
||||
String METRICS_NAME = "Tables";
|
||||
|
||||
/**
|
||||
* The name of the metrics context that metrics will be under.
|
||||
*/
|
||||
String METRICS_CONTEXT = "regionserver";
|
||||
|
||||
/**
|
||||
* Description
|
||||
*/
|
||||
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer tables";
|
||||
|
||||
/**
|
||||
* The name of the metrics context that metrics will be under in jmx
|
||||
*/
|
||||
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
|
||||
|
||||
String NUM_TABLES = "numTables";
|
||||
String NUMBER_OF_TABLES_DESC = "Number of tables in the metrics system";
|
||||
|
||||
/**
|
||||
* Register a MetricsTableSource as being open.
|
||||
*
|
||||
* @param table The table name
|
||||
* @param source the source for the table being opened.
|
||||
*/
|
||||
void register(String table, MetricsTableSource source);
|
||||
|
||||
/**
|
||||
* Remove a table's source. This is called when regions of a table are closed.
|
||||
*
|
||||
* @param table The table name
|
||||
*/
|
||||
void deregister(String table);
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
/**
|
||||
* This interface will be implemented to allow region server to push table metrics into
|
||||
* MetricsRegionAggregateSource that will in turn push data to the Hadoop metrics system.
|
||||
*/
|
||||
public interface MetricsTableSource extends Comparable<MetricsTableSource> {
|
||||
|
||||
String READ_REQUEST_COUNT = "readRequestCount";
|
||||
String READ_REQUEST_COUNT_DESC = "Number fo read requests";
|
||||
String WRITE_REQUEST_COUNT = "writeRequestCount";
|
||||
String WRITE_REQUEST_COUNT_DESC = "Number fo write requests";
|
||||
String TOTAL_REQUEST_COUNT = "totalRequestCount";
|
||||
String TOTAL_REQUEST_COUNT_DESC = "Number fo total requests";
|
||||
|
||||
String getTableName();
|
||||
|
||||
/**
|
||||
* Close the table's metrics as all the region are closing.
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Get the aggregate source to which this reports.
|
||||
*/
|
||||
MetricsTableAggregateSource getAggregateSource();
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
|
||||
/**
|
||||
* Interface of class that will wrap a MetricsTableSource and export numbers so they can be
|
||||
* used in MetricsTableSource
|
||||
*/
|
||||
public interface MetricsTableWrapperAggregate {
|
||||
|
||||
/**
|
||||
* Get the number of read requests that have been issued against this table
|
||||
*/
|
||||
long getReadRequestsCount(String table);
|
||||
|
||||
/**
|
||||
* Get the number of write requests that have been issued against this table
|
||||
*/
|
||||
long getWriteRequestsCount(String table);
|
||||
|
||||
/**
|
||||
* Get the total number of requests that have been issued against this table
|
||||
*/
|
||||
long getTotalRequestsCount(String table);
|
||||
}
|
|
@ -29,6 +29,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
|
|||
INSTANCE;
|
||||
private Object aggLock = new Object();
|
||||
private MetricsRegionAggregateSourceImpl aggImpl;
|
||||
private MetricsTableAggregateSourceImpl tblAggImpl;
|
||||
}
|
||||
|
||||
private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
|
||||
|
@ -40,6 +41,15 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized MetricsTableAggregateSourceImpl getTableAggregate() {
|
||||
synchronized (FactoryStorage.INSTANCE.aggLock) {
|
||||
if (FactoryStorage.INSTANCE.tblAggImpl == null) {
|
||||
FactoryStorage.INSTANCE.tblAggImpl = new MetricsTableAggregateSourceImpl();
|
||||
}
|
||||
return FactoryStorage.INSTANCE.tblAggImpl;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) {
|
||||
|
@ -50,4 +60,9 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
|
|||
public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
|
||||
return new MetricsRegionSourceImpl(wrapper, getAggregate());
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper) {
|
||||
return new MetricsTableSourceImpl(table, getTableAggregate(), wrapper);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
|
||||
implements MetricsTableAggregateSource {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MetricsTableAggregateSourceImpl.class);
|
||||
private ConcurrentHashMap<String, MetricsTableSource> tableSources = new ConcurrentHashMap<>();
|
||||
|
||||
public MetricsTableAggregateSourceImpl() {
|
||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
|
||||
}
|
||||
|
||||
public MetricsTableAggregateSourceImpl(String metricsName,
|
||||
String metricsDescription,
|
||||
String metricsContext,
|
||||
String metricsJmxContext) {
|
||||
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(String table, MetricsTableSource source) {
|
||||
tableSources.put(table, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deregister(String table) {
|
||||
try {
|
||||
tableSources.remove(table);
|
||||
} catch (Exception e) {
|
||||
// Ignored. If this errors out it means that someone is double
|
||||
// closing the region source and the region is already nulled out.
|
||||
LOG.info(
|
||||
"Error trying to remove " + table + " from " + this.getClass().getSimpleName(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
|
||||
* expectations of java programmers. Instead of returning anything Hadoop metrics expects
|
||||
* getMetrics to push the metrics into the collector.
|
||||
*
|
||||
* @param collector the collector
|
||||
* @param all get all the metrics regardless of when they last changed.
|
||||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
|
||||
|
||||
if (tableSources != null) {
|
||||
for (MetricsTableSource tableMetricSource : tableSources.values()) {
|
||||
if (tableMetricSource instanceof MetricsTableSourceImpl) {
|
||||
((MetricsTableSourceImpl) tableMetricSource).snapshot(mrb, all);
|
||||
}
|
||||
}
|
||||
mrb.addGauge(Interns.info(NUM_TABLES, NUMBER_OF_TABLES_DESC), tableSources.size());
|
||||
metricsRegistry.snapshot(mrb, all);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsTableSourceImpl implements MetricsTableSource {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MetricsTableSourceImpl.class);
|
||||
|
||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
// Non-final so that we can null out the wrapper
|
||||
// This is just paranoia. We really really don't want to
|
||||
// leak a whole table by way of keeping the
|
||||
// tableWrapper around too long.
|
||||
private MetricsTableWrapperAggregate tableWrapperAgg;
|
||||
private final MetricsTableAggregateSourceImpl agg;
|
||||
private final DynamicMetricsRegistry registry;
|
||||
private final String tableNamePrefix;
|
||||
private final TableName tableName;
|
||||
private final int hashCode;
|
||||
|
||||
public MetricsTableSourceImpl(String tblName,
|
||||
MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
|
||||
LOG.debug("Creating new MetricsTableSourceImpl for table ");
|
||||
this.tableName = TableName.valueOf(tblName);
|
||||
this.agg = aggregate;
|
||||
agg.register(tblName, this);
|
||||
this.tableWrapperAgg = tblWrapperAgg;
|
||||
this.registry = agg.getMetricsRegistry();
|
||||
this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
|
||||
"_table_" + this.tableName.getQualifierAsString() + "_metric_";
|
||||
this.hashCode = this.tableName.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
boolean wasClosed = closed.getAndSet(true);
|
||||
|
||||
// Has someone else already closed this for us?
|
||||
if (wasClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Before removing the metrics remove this table from the aggregate table bean.
|
||||
// This should mean that it's unlikely that snapshot and close happen at the same time.
|
||||
agg.deregister(tableName.getNameAsString());
|
||||
|
||||
// While it's un-likely that snapshot and close happen at the same time it's still possible.
|
||||
// So grab the lock to ensure that all calls to snapshot are done before we remove the metrics
|
||||
synchronized (this) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Removing table Metrics for table ");
|
||||
}
|
||||
tableWrapperAgg = null;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public MetricsTableAggregateSource getAggregateSource() {
|
||||
return agg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(MetricsTableSource source) {
|
||||
if (!(source instanceof MetricsTableSourceImpl)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source;
|
||||
if (impl == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return Long.compare(hashCode, impl.hashCode);
|
||||
}
|
||||
|
||||
void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
|
||||
|
||||
// If there is a close that started be double extra sure
|
||||
// that we're not getting any locks and not putting data
|
||||
// into the metrics that should be removed. So early out
|
||||
// before even getting the lock.
|
||||
if (closed.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Grab the read
|
||||
// This ensures that removes of the metrics
|
||||
// can't happen while we are putting them back in.
|
||||
synchronized (this) {
|
||||
|
||||
// It's possible that a close happened between checking
|
||||
// the closed variable and getting the lock.
|
||||
if (closed.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.tableWrapperAgg != null) {
|
||||
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.READ_REQUEST_COUNT,
|
||||
MetricsTableSource.READ_REQUEST_COUNT_DESC),
|
||||
tableWrapperAgg.getReadRequestsCount(tableName.getNameAsString()));
|
||||
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.WRITE_REQUEST_COUNT,
|
||||
MetricsTableSource.WRITE_REQUEST_COUNT_DESC),
|
||||
tableWrapperAgg.getWriteRequestsCount(tableName.getNameAsString()));
|
||||
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.TOTAL_REQUEST_COUNT,
|
||||
MetricsTableSource.TOTAL_REQUEST_COUNT_DESC),
|
||||
tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTableName() {
|
||||
return tableName.getNameAsString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hashCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
return (o instanceof MetricsTableSourceImpl && compareTo((MetricsTableSourceImpl) o) == 0);
|
||||
}
|
||||
|
||||
public MetricsTableWrapperAggregate getTableWrapper() {
|
||||
return tableWrapperAgg;
|
||||
}
|
||||
|
||||
public String getTableNamePrefix() {
|
||||
return tableNamePrefix;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test for MetricsTableSourceImpl
|
||||
*/
|
||||
@Category({MetricsTests.class, SmallTests.class})
|
||||
public class TestMetricsTableSourceImpl {
|
||||
|
||||
@Test
|
||||
public void testCompareToHashCode() throws Exception {
|
||||
MetricsRegionServerSourceFactory metricsFact =
|
||||
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
|
||||
|
||||
MetricsTableSource one = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
|
||||
MetricsTableSource oneClone = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
|
||||
MetricsTableSource two = metricsFact.createTable("TWOTABLE", new TableWrapperStub("TWOTABLE"));
|
||||
|
||||
assertEquals(0, one.compareTo(oneClone));
|
||||
assertEquals(one.hashCode(), oneClone.hashCode());
|
||||
assertNotEquals(one, two);
|
||||
|
||||
assertTrue(one.compareTo(two) != 0);
|
||||
assertTrue(two.compareTo(one) != 0);
|
||||
assertTrue(two.compareTo(one) != one.compareTo(two));
|
||||
assertTrue(two.compareTo(two) == 0);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void testNoGetTableMetricsSourceImpl() throws Exception {
|
||||
// This should throw an exception because MetricsTableSourceImpl should only
|
||||
// be created by a factory.
|
||||
CompatibilitySingletonFactory.getInstance(MetricsTableSourceImpl.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTableMetrics() throws Exception{
|
||||
MetricsTableSource oneTbl =
|
||||
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
||||
.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
|
||||
assertEquals("ONETABLE", oneTbl.getTableName());
|
||||
}
|
||||
|
||||
static class TableWrapperStub implements MetricsTableWrapperAggregate {
|
||||
|
||||
private String tableName;
|
||||
|
||||
public TableWrapperStub(String tableName) {
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRequestsCount(String table) {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRequestsCount(String table) {
|
||||
return 20;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalRequestsCount(String table) {
|
||||
return 30;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -346,6 +346,7 @@ public class HRegionServer extends HasThread implements
|
|||
public static final String REGIONSERVER = "regionserver";
|
||||
|
||||
MetricsRegionServer metricsRegionServer;
|
||||
MetricsTable metricsTable;
|
||||
private SpanReceiverHost spanReceiverHost;
|
||||
|
||||
/**
|
||||
|
@ -1408,6 +1409,7 @@ public class HRegionServer extends HasThread implements
|
|||
this.walFactory = setupWALAndReplication();
|
||||
// Init in here rather than in constructor after thread name has been set
|
||||
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
|
||||
this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
|
||||
|
||||
startServiceThreads();
|
||||
startHeapMemoryManager();
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsTable {
|
||||
private final MetricsTableAggregateSource tableSourceAgg;
|
||||
private MetricsTableWrapperAggregate tableWrapperAgg;
|
||||
|
||||
public MetricsTable(final MetricsTableWrapperAggregate wrapper) {
|
||||
tableSourceAgg = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
||||
.getTableAggregate();
|
||||
this.tableWrapperAgg = wrapper;
|
||||
}
|
||||
|
||||
public MetricsTableWrapperAggregate getTableWrapperAgg() {
|
||||
return tableWrapperAgg;
|
||||
}
|
||||
|
||||
public MetricsTableAggregateSource getTableSourceAgg() {
|
||||
return tableSourceAgg;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggregate, Closeable {
|
||||
private final HRegionServer regionServer;
|
||||
private ScheduledExecutorService executor;
|
||||
private Runnable runnable;
|
||||
private long period;
|
||||
private ScheduledFuture<?> tableMetricsUpdateTask;
|
||||
private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap = new ConcurrentHashMap<>();
|
||||
|
||||
public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
|
||||
this.regionServer = regionServer;
|
||||
this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
|
||||
HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
|
||||
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
|
||||
this.runnable = new TableMetricsWrapperRunnable();
|
||||
this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, this.period,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public class TableMetricsWrapperRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
|
||||
|
||||
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
|
||||
TableName tbl= r.getTableDesc().getTableName();
|
||||
MetricsTableValues metricsTable = localMetricsTableMap.get(tbl);
|
||||
if (metricsTable == null) {
|
||||
metricsTable = new MetricsTableValues();
|
||||
localMetricsTableMap.put(tbl, metricsTable);
|
||||
}
|
||||
metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());
|
||||
metricsTable.setWriteRequestsCount(metricsTable.getWriteRequestsCount() + r.getWriteRequestsCount());
|
||||
metricsTable.setTotalRequestsCount(metricsTable.getReadRequestsCount() + metricsTable.getWriteRequestsCount());
|
||||
}
|
||||
|
||||
for(Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
|
||||
TableName tbl = entry.getKey();
|
||||
if (metricsTableMap.get(tbl) == null) {
|
||||
MetricsTableSource tableSource = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsRegionServerSourceFactory.class).createTable(tbl.getNameAsString(),
|
||||
MetricsTableWrapperAggregateImpl.this);
|
||||
CompatibilitySingletonFactory
|
||||
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate()
|
||||
.register(tbl.getNameAsString(), tableSource);
|
||||
}
|
||||
metricsTableMap.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
Set<TableName> existingTableNames = Sets.newHashSet(metricsTableMap.keySet());
|
||||
existingTableNames.removeAll(localMetricsTableMap.keySet());
|
||||
MetricsTableAggregateSource agg = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
|
||||
for (TableName table : existingTableNames) {
|
||||
agg.deregister(table.getNameAsString());
|
||||
if (metricsTableMap.get(table) != null) {
|
||||
metricsTableMap.remove(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRequestsCount(String table) {
|
||||
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||
if (metricsTable == null)
|
||||
return 0;
|
||||
else
|
||||
return metricsTable.getReadRequestsCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRequestsCount(String table) {
|
||||
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||
if (metricsTable == null)
|
||||
return 0;
|
||||
else
|
||||
return metricsTable.getWriteRequestsCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalRequestsCount(String table) {
|
||||
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||
if (metricsTable == null)
|
||||
return 0;
|
||||
else
|
||||
return metricsTable.getTotalRequestsCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
tableMetricsUpdateTask.cancel(true);
|
||||
}
|
||||
|
||||
private static class MetricsTableValues {
|
||||
|
||||
private long totalRequestsCount;
|
||||
private long readRequestsCount;
|
||||
private long writeRequestsCount;
|
||||
|
||||
public long getTotalRequestsCount() {
|
||||
return totalRequestsCount;
|
||||
}
|
||||
|
||||
public void setTotalRequestsCount(long totalRequestsCount) {
|
||||
this.totalRequestsCount = totalRequestsCount;
|
||||
}
|
||||
|
||||
public long getReadRequestsCount() {
|
||||
return readRequestsCount;
|
||||
}
|
||||
|
||||
public void setReadRequestsCount(long readRequestsCount) {
|
||||
this.readRequestsCount = readRequestsCount;
|
||||
}
|
||||
|
||||
public long getWriteRequestsCount() {
|
||||
return writeRequestsCount;
|
||||
}
|
||||
|
||||
public void setWriteRequestsCount(long writeRequestsCount) {
|
||||
this.writeRequestsCount = writeRequestsCount;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
|
||||
|
||||
private String tableName;
|
||||
|
||||
public MetricsTableWrapperStub(String tableName) {
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRequestsCount(String table) {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRequestsCount(String table) {
|
||||
return 20;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalRequestsCount(String table) {
|
||||
return 30;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestMetricsTableAggregate {
|
||||
|
||||
public static MetricsAssertHelper HELPER =
|
||||
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
|
||||
|
||||
@Test
|
||||
public void testTableWrapperAggregateMetrics() throws IOException {
|
||||
String tableName = "testRequestCount";
|
||||
MetricsTableWrapperStub tableWrapper = new MetricsTableWrapperStub(tableName);
|
||||
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
|
||||
.createTable(tableName, tableWrapper);
|
||||
MetricsTableAggregateSource agg = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
|
||||
|
||||
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_readRequestCount", 10, agg);
|
||||
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_writeRequestCount", 20, agg);
|
||||
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_totalRequestCount", 30, agg);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue