HBASE-15518 Add Per-Table metrics back (Alicia Ying Shu)

This commit is contained in:
Enis Soztutar 2016-04-20 14:35:45 -07:00
parent 71c55d182d
commit 1311e25171
13 changed files with 840 additions and 1 deletions

View File

@ -34,8 +34,24 @@ public interface MetricsRegionServerSourceFactory {
/** /**
* Create a MetricsRegionSource from a MetricsRegionWrapper. * Create a MetricsRegionSource from a MetricsRegionWrapper.
* *
* @param wrapper * @param wrapper The wrapped region
* @return A metrics region source * @return A metrics region source
*/ */
MetricsRegionSource createRegion(MetricsRegionWrapper wrapper); MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
/**
* Create a MetricsTableSource from a MetricsTableWrapper.
*
* @param table The table name
* @param wrapper The wrapped table aggregate
* @return A metrics table source
*/
MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper);
/**
* Get a MetricsTableAggregateSource
*
* @return A metrics table aggregate source
*/
MetricsTableAggregateSource getTableAggregate();
} }

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.metrics.BaseSource;
/**
* This interface will be implemented by a MetricsSource that will export metrics from
* multiple regions of a table into the hadoop metrics system.
*/
public interface MetricsTableAggregateSource extends BaseSource {
/**
* The name of the metrics
*/
String METRICS_NAME = "Tables";
/**
* The name of the metrics context that metrics will be under.
*/
String METRICS_CONTEXT = "regionserver";
/**
* Description
*/
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer tables";
/**
* The name of the metrics context that metrics will be under in jmx
*/
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
String NUM_TABLES = "numTables";
String NUMBER_OF_TABLES_DESC = "Number of tables in the metrics system";
/**
* Register a MetricsTableSource as being open.
*
* @param table The table name
* @param source the source for the table being opened.
*/
void register(String table, MetricsTableSource source);
/**
* Remove a table's source. This is called when regions of a table are closed.
*
* @param table The table name
*/
void deregister(String table);
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
/**
* This interface will be implemented to allow region server to push table metrics into
* MetricsRegionAggregateSource that will in turn push data to the Hadoop metrics system.
*/
public interface MetricsTableSource extends Comparable<MetricsTableSource> {
String READ_REQUEST_COUNT = "readRequestCount";
String READ_REQUEST_COUNT_DESC = "Number fo read requests";
String WRITE_REQUEST_COUNT = "writeRequestCount";
String WRITE_REQUEST_COUNT_DESC = "Number fo write requests";
String TOTAL_REQUEST_COUNT = "totalRequestCount";
String TOTAL_REQUEST_COUNT_DESC = "Number fo total requests";
String getTableName();
/**
* Close the table's metrics as all the region are closing.
*/
void close();
/**
* Get the aggregate source to which this reports.
*/
MetricsTableAggregateSource getAggregateSource();
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
/**
* Interface of class that will wrap a MetricsTableSource and export numbers so they can be
* used in MetricsTableSource
*/
public interface MetricsTableWrapperAggregate {
/**
* Get the number of read requests that have been issued against this table
*/
long getReadRequestsCount(String table);
/**
* Get the number of write requests that have been issued against this table
*/
long getWriteRequestsCount(String table);
/**
* Get the total number of requests that have been issued against this table
*/
long getTotalRequestsCount(String table);
}

View File

@ -29,6 +29,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
INSTANCE; INSTANCE;
private Object aggLock = new Object(); private Object aggLock = new Object();
private MetricsRegionAggregateSourceImpl aggImpl; private MetricsRegionAggregateSourceImpl aggImpl;
private MetricsTableAggregateSourceImpl tblAggImpl;
} }
private synchronized MetricsRegionAggregateSourceImpl getAggregate() { private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
@ -40,6 +41,15 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
} }
} }
@Override
public synchronized MetricsTableAggregateSourceImpl getTableAggregate() {
synchronized (FactoryStorage.INSTANCE.aggLock) {
if (FactoryStorage.INSTANCE.tblAggImpl == null) {
FactoryStorage.INSTANCE.tblAggImpl = new MetricsTableAggregateSourceImpl();
}
return FactoryStorage.INSTANCE.tblAggImpl;
}
}
@Override @Override
public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) { public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) {
@ -50,4 +60,9 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) { public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
return new MetricsRegionSourceImpl(wrapper, getAggregate()); return new MetricsRegionSourceImpl(wrapper, getAggregate());
} }
@Override
public MetricsTableSource createTable(String table, MetricsTableWrapperAggregate wrapper) {
return new MetricsTableSourceImpl(table, getTableAggregate(), wrapper);
}
} }

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
@InterfaceAudience.Private
public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
implements MetricsTableAggregateSource {
private static final Log LOG = LogFactory.getLog(MetricsTableAggregateSourceImpl.class);
private ConcurrentHashMap<String, MetricsTableSource> tableSources = new ConcurrentHashMap<>();
public MetricsTableAggregateSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}
public MetricsTableAggregateSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
}
@Override
public void register(String table, MetricsTableSource source) {
tableSources.put(table, source);
}
@Override
public void deregister(String table) {
try {
tableSources.remove(table);
} catch (Exception e) {
// Ignored. If this errors out it means that someone is double
// closing the region source and the region is already nulled out.
LOG.info(
"Error trying to remove " + table + " from " + this.getClass().getSimpleName(),
e);
}
}
/**
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
* expectations of java programmers. Instead of returning anything Hadoop metrics expects
* getMetrics to push the metrics into the collector.
*
* @param collector the collector
* @param all get all the metrics regardless of when they last changed.
*/
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
if (tableSources != null) {
for (MetricsTableSource tableMetricSource : tableSources.values()) {
if (tableMetricSource instanceof MetricsTableSourceImpl) {
((MetricsTableSourceImpl) tableMetricSource).snapshot(mrb, all);
}
}
mrb.addGauge(Interns.info(NUM_TABLES, NUMBER_OF_TABLES_DESC), tableSources.size());
metricsRegistry.snapshot(mrb, all);
}
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.Interns;
@InterfaceAudience.Private
public class MetricsTableSourceImpl implements MetricsTableSource {
private static final Log LOG = LogFactory.getLog(MetricsTableSourceImpl.class);
private AtomicBoolean closed = new AtomicBoolean(false);
// Non-final so that we can null out the wrapper
// This is just paranoia. We really really don't want to
// leak a whole table by way of keeping the
// tableWrapper around too long.
private MetricsTableWrapperAggregate tableWrapperAgg;
private final MetricsTableAggregateSourceImpl agg;
private final DynamicMetricsRegistry registry;
private final String tableNamePrefix;
private final TableName tableName;
private final int hashCode;
public MetricsTableSourceImpl(String tblName,
MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
LOG.debug("Creating new MetricsTableSourceImpl for table ");
this.tableName = TableName.valueOf(tblName);
this.agg = aggregate;
agg.register(tblName, this);
this.tableWrapperAgg = tblWrapperAgg;
this.registry = agg.getMetricsRegistry();
this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
"_table_" + this.tableName.getQualifierAsString() + "_metric_";
this.hashCode = this.tableName.hashCode();
}
@Override
public void close() {
boolean wasClosed = closed.getAndSet(true);
// Has someone else already closed this for us?
if (wasClosed) {
return;
}
// Before removing the metrics remove this table from the aggregate table bean.
// This should mean that it's unlikely that snapshot and close happen at the same time.
agg.deregister(tableName.getNameAsString());
// While it's un-likely that snapshot and close happen at the same time it's still possible.
// So grab the lock to ensure that all calls to snapshot are done before we remove the metrics
synchronized (this) {
if (LOG.isTraceEnabled()) {
LOG.trace("Removing table Metrics for table ");
}
tableWrapperAgg = null;
}
}
@Override
public MetricsTableAggregateSource getAggregateSource() {
return agg;
}
@Override
public int compareTo(MetricsTableSource source) {
if (!(source instanceof MetricsTableSourceImpl)) {
return -1;
}
MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source;
if (impl == null) {
return -1;
}
return Long.compare(hashCode, impl.hashCode);
}
void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
// If there is a close that started be double extra sure
// that we're not getting any locks and not putting data
// into the metrics that should be removed. So early out
// before even getting the lock.
if (closed.get()) {
return;
}
// Grab the read
// This ensures that removes of the metrics
// can't happen while we are putting them back in.
synchronized (this) {
// It's possible that a close happened between checking
// the closed variable and getting the lock.
if (closed.get()) {
return;
}
if (this.tableWrapperAgg != null) {
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.READ_REQUEST_COUNT,
MetricsTableSource.READ_REQUEST_COUNT_DESC),
tableWrapperAgg.getReadRequestsCount(tableName.getNameAsString()));
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.WRITE_REQUEST_COUNT,
MetricsTableSource.WRITE_REQUEST_COUNT_DESC),
tableWrapperAgg.getWriteRequestsCount(tableName.getNameAsString()));
mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.TOTAL_REQUEST_COUNT,
MetricsTableSource.TOTAL_REQUEST_COUNT_DESC),
tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
}
}
}
@Override
public String getTableName() {
return tableName.getNameAsString();
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return (o instanceof MetricsTableSourceImpl && compareTo((MetricsTableSourceImpl) o) == 0);
}
public MetricsTableWrapperAggregate getTableWrapper() {
return tableWrapperAgg;
}
public String getTableNamePrefix() {
return tableNamePrefix;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test for MetricsTableSourceImpl
*/
@Category({MetricsTests.class, SmallTests.class})
public class TestMetricsTableSourceImpl {
@Test
public void testCompareToHashCode() throws Exception {
MetricsRegionServerSourceFactory metricsFact =
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
MetricsTableSource one = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
MetricsTableSource oneClone = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
MetricsTableSource two = metricsFact.createTable("TWOTABLE", new TableWrapperStub("TWOTABLE"));
assertEquals(0, one.compareTo(oneClone));
assertEquals(one.hashCode(), oneClone.hashCode());
assertNotEquals(one, two);
assertTrue(one.compareTo(two) != 0);
assertTrue(two.compareTo(one) != 0);
assertTrue(two.compareTo(one) != one.compareTo(two));
assertTrue(two.compareTo(two) == 0);
}
@Test(expected = RuntimeException.class)
public void testNoGetTableMetricsSourceImpl() throws Exception {
// This should throw an exception because MetricsTableSourceImpl should only
// be created by a factory.
CompatibilitySingletonFactory.getInstance(MetricsTableSourceImpl.class);
}
@Test
public void testGetTableMetrics() throws Exception{
MetricsTableSource oneTbl =
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
assertEquals("ONETABLE", oneTbl.getTableName());
}
static class TableWrapperStub implements MetricsTableWrapperAggregate {
private String tableName;
public TableWrapperStub(String tableName) {
this.tableName = tableName;
}
@Override
public long getReadRequestsCount(String table) {
return 10;
}
@Override
public long getWriteRequestsCount(String table) {
return 20;
}
@Override
public long getTotalRequestsCount(String table) {
return 30;
}
public String getTableName() {
return tableName;
}
}
}

View File

@ -340,6 +340,7 @@ public class HRegionServer extends HasThread implements
public static final String REGIONSERVER = "regionserver"; public static final String REGIONSERVER = "regionserver";
MetricsRegionServer metricsRegionServer; MetricsRegionServer metricsRegionServer;
MetricsTable metricsTable;
private SpanReceiverHost spanReceiverHost; private SpanReceiverHost spanReceiverHost;
/** /**
@ -1399,6 +1400,7 @@ public class HRegionServer extends HasThread implements
this.walFactory = setupWALAndReplication(); this.walFactory = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set // Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
startServiceThreads(); startServiceThreads();
startHeapMemoryManager(); startHeapMemoryManager();

View File

@ -0,0 +1,42 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class MetricsTable {
private final MetricsTableAggregateSource tableSourceAgg;
private MetricsTableWrapperAggregate tableWrapperAgg;
public MetricsTable(final MetricsTableWrapperAggregate wrapper) {
tableSourceAgg = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.getTableAggregate();
this.tableWrapperAgg = wrapper;
}
public MetricsTableWrapperAggregate getTableWrapperAgg() {
return tableWrapperAgg;
}
public MetricsTableAggregateSource getTableSourceAgg() {
return tableSourceAgg;
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsExecutor;
import com.google.common.collect.Sets;
@InterfaceAudience.Private
public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggregate, Closeable {
private final HRegionServer regionServer;
private ScheduledExecutorService executor;
private Runnable runnable;
private long period;
private ScheduledFuture<?> tableMetricsUpdateTask;
private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap = new ConcurrentHashMap<>();
public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
this.runnable = new TableMetricsWrapperRunnable();
this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, this.period,
TimeUnit.MILLISECONDS);
}
public class TableMetricsWrapperRunnable implements Runnable {
@Override
public void run() {
Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
TableName tbl= r.getTableDesc().getTableName();
MetricsTableValues metricsTable = localMetricsTableMap.get(tbl);
if (metricsTable == null) {
metricsTable = new MetricsTableValues();
localMetricsTableMap.put(tbl, metricsTable);
}
metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());
metricsTable.setWriteRequestsCount(metricsTable.getWriteRequestsCount() + r.getWriteRequestsCount());
metricsTable.setTotalRequestsCount(metricsTable.getReadRequestsCount() + metricsTable.getWriteRequestsCount());
}
for(Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
TableName tbl = entry.getKey();
if (metricsTableMap.get(tbl) == null) {
MetricsTableSource tableSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createTable(tbl.getNameAsString(),
MetricsTableWrapperAggregateImpl.this);
CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate()
.register(tbl.getNameAsString(), tableSource);
}
metricsTableMap.put(entry.getKey(), entry.getValue());
}
Set<TableName> existingTableNames = Sets.newHashSet(metricsTableMap.keySet());
existingTableNames.removeAll(localMetricsTableMap.keySet());
MetricsTableAggregateSource agg = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
for (TableName table : existingTableNames) {
agg.deregister(table.getNameAsString());
if (metricsTableMap.get(table) != null) {
metricsTableMap.remove(table);
}
}
}
}
@Override
public long getReadRequestsCount(String table) {
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
if (metricsTable == null)
return 0;
else
return metricsTable.getReadRequestsCount();
}
@Override
public long getWriteRequestsCount(String table) {
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
if (metricsTable == null)
return 0;
else
return metricsTable.getWriteRequestsCount();
}
@Override
public long getTotalRequestsCount(String table) {
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
if (metricsTable == null)
return 0;
else
return metricsTable.getTotalRequestsCount();
}
@Override
public void close() throws IOException {
tableMetricsUpdateTask.cancel(true);
}
private static class MetricsTableValues {
private long totalRequestsCount;
private long readRequestsCount;
private long writeRequestsCount;
public long getTotalRequestsCount() {
return totalRequestsCount;
}
public void setTotalRequestsCount(long totalRequestsCount) {
this.totalRequestsCount = totalRequestsCount;
}
public long getReadRequestsCount() {
return readRequestsCount;
}
public void setReadRequestsCount(long readRequestsCount) {
this.readRequestsCount = readRequestsCount;
}
public long getWriteRequestsCount() {
return writeRequestsCount;
}
public void setWriteRequestsCount(long writeRequestsCount) {
this.writeRequestsCount = writeRequestsCount;
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
private String tableName;
public MetricsTableWrapperStub(String tableName) {
this.tableName = tableName;
}
@Override
public long getReadRequestsCount(String table) {
return 10;
}
@Override
public long getWriteRequestsCount(String table) {
return 20;
}
@Override
public long getTotalRequestsCount(String table) {
return 30;
}
public String getTableName() {
return tableName;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMetricsTableAggregate {
public static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
@Test
public void testTableWrapperAggregateMetrics() throws IOException {
String tableName = "testRequestCount";
MetricsTableWrapperStub tableWrapper = new MetricsTableWrapperStub(tableName);
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createTable(tableName, tableWrapper);
MetricsTableAggregateSource agg = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_readRequestCount", 10, agg);
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_writeRequestCount", 20, agg);
HELPER.assertCounter("Namespace_default_table_testRequestCount_metric_totalRequestCount", 30, agg);
}
}