HBASE-13965 Stochastic Load Balancer JMX Metrics (Lei Chen)

This commit is contained in:
tedyu 2015-08-03 12:45:38 -07:00
parent 4b6598e394
commit 20d1fa36e7
5 changed files with 468 additions and 0 deletions

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
/**
* This interface extends the basic metrics balancer source to add a function
* to report metrics that related to stochastic load balancer. The purpose is to
* offer an insight to the internal cost calculations that can be useful to tune
* the balancer. For details, refer to HBASE-13965
*/
public interface MetricsStochasticBalancerSource extends MetricsBalancerSource {
/**
* Updates the number of metrics reported to JMX
*/
public void updateMetricsSize(int size);
/**
* Reports stochastic load balancer costs to JMX
*/
public void updateStochasticCost(String tableName, String costFunctionName,
String costFunctionDesc, Double value);
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.Interns;
@InterfaceAudience.Private
public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceImpl implements
MetricsStochasticBalancerSource {
private static final String TABLE_FUNCTION_SEP = "_";
// Most Recently Used(MRU) cache
private static final float MRU_LOAD_FACTOR = 0.75f;
private int metricsSize = 1000;
private int mruCap = calcMruCap(metricsSize);
private Map<String, Map<String, Double>> stochasticCosts =
new LinkedHashMap<String, Map<String, Double>>(mruCap, MRU_LOAD_FACTOR, true) {
private static final long serialVersionUID = 8204713453436906599L;
@Override
protected boolean removeEldestEntry(Map.Entry<String, Map<String, Double>> eldest) {
return size() > mruCap;
}
};
private Map<String, String> costFunctionDescs = new ConcurrentHashMap<String, String>();
/**
* Calculates the mru cache capacity from the metrics size
*/
private static int calcMruCap(int metricsSize) {
return (int) Math.ceil(metricsSize / MRU_LOAD_FACTOR) + 1;
}
@Override
public void updateMetricsSize(int size) {
if (size > 0) {
metricsSize = size;
mruCap = calcMruCap(size);
}
}
/**
* Reports stochastic load balancer costs to JMX
*/
public void updateStochasticCost(String tableName, String costFunctionName, String functionDesc,
Double cost) {
if (tableName == null || costFunctionName == null || cost == null) {
return;
}
if (functionDesc != null) {
costFunctionDescs.put(costFunctionName, functionDesc);
}
synchronized (stochasticCosts) {
Map<String, Double> costs = stochasticCosts.get(tableName);
if (costs == null) {
costs = new ConcurrentHashMap<String, Double>();
}
costs.put(costFunctionName, cost);
stochasticCosts.put(tableName, costs);
}
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName);
if (stochasticCosts != null) {
synchronized (stochasticCosts) {
for (Map.Entry<String, Map<String, Double>> tableEntry : stochasticCosts.entrySet()) {
for (Map.Entry<String, Double> costEntry : tableEntry.getValue().entrySet()) {
String attrName = tableEntry.getKey() + TABLE_FUNCTION_SEP + costEntry.getKey();
Double cost = costEntry.getValue();
String functionDesc = costFunctionDescs.get(costEntry.getKey());
if (functionDesc == null) functionDesc = costEntry.getKey();
metricsRecordBuilder.addGauge(Interns.info(attrName, functionDesc), cost);
}
}
}
}
metricsRegistry.snapshot(metricsRecordBuilder, all);
}
}

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.hadoop.hbase.master.balancer.MetricsStochasticBalancerSourceImpl

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
/**
* This metrics balancer uses extended source for stochastic load balancer
* to report its related metrics to JMX. For details, refer to HBASE-13965
*/
public class MetricsStochasticBalancer extends MetricsBalancer {
/**
* Use the stochastic source instead of the default source.
*/
private MetricsStochasticBalancerSource stochasticSource = null;
public MetricsStochasticBalancer() {
initSource();
}
/**
* This function overrides the initSource in the MetricsBalancer, use
* MetricsStochasticBalancerSource instead of the MetricsBalancerSource.
*/
@Override
protected void initSource() {
stochasticSource =
CompatibilitySingletonFactory.getInstance(MetricsStochasticBalancerSource.class);
}
@Override
public void balanceCluster(long time) {
stochasticSource.updateBalanceCluster(time);
}
@Override
public void incrMiscInvocations() {
stochasticSource.incrMiscInvocations();
}
/**
* Updates the number of metrics reported to JMX
*/
public void updateMetricsSize(int size) {
stochasticSource.updateMetricsSize(size);
}
/**
* Reports stochastic load balancer costs to JMX
*/
public void updateStochasticCost(String tableName, String costFunctionName,
String costFunctionDesc, Double value) {
stochasticSource.updateStochasticCost(tableName, costFunctionName, costFunctionDesc, value);
}
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.balancer.BalancerTestBase;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.MethodSorters;
@Category({ MiscTests.class, MediumTests.class })
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
private static final Log LOG = LogFactory.getLog(TestStochasticBalancerJmxMetrics.class);
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static int connectorPort = 61120;
private static StochasticLoadBalancer loadBalancer;
/**
* a simple cluster for testing JMX.
*/
private static int[] mockCluster_ensemble = new int[] { 0, 1, 2, 3 };
private static int[] mockCluster_pertable_1 = new int[] { 0, 1, 2 };
private static int[] mockCluster_pertable_2 = new int[] { 3, 1, 1 };
private static int[] mockCluster_pertable_namespace = new int[] { 1, 3, 1 };
private static final String TABLE_NAME_1 = "Table1";
private static final String TABLE_NAME_2 = "Table2";
private static final String TABLE_NAME_NAMESPACE = "hbase:namespace";
private static Configuration conf = null;
/**
* Setup the environment for the test.
*/
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf = UTIL.getConfiguration();
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
conf.setInt("regionserver.rmi.registry.port", connectorPort);
UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
/**
* In Ensemble mode, there should be only one ensemble table
*/
@Test
public void testJmxMetrics_EnsembleMode() throws Exception {
loadBalancer = new StochasticLoadBalancer();
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
loadBalancer.setConf(conf);
TableName tableName = TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
loadBalancer.balanceCluster(tableName, clusterState);
String[] tableNames = new String[] { tableName.getNameAsString() };
String[] functionNames = loadBalancer.getCostFunctionNames();
Set<String> jmxMetrics = readJmxMetrics();
Set<String> expectedMetrics = getExpectedJmxMetrics(tableNames, functionNames);
// printMetrics(jmxMetrics, "existing metrics in ensemble mode");
// printMetrics(expectedMetrics, "expected metrics in ensemble mode");
// assert that every expected is in the JMX
for (String expected : expectedMetrics) {
assertTrue("Metric " + expected + " can not be found in JMX in ensemble mode.",
jmxMetrics.contains(expected));
}
}
/**
* In per-table mode, each table has a set of metrics
*/
@Test
public void testJmxMetrics_PerTableMode() throws Exception {
loadBalancer = new StochasticLoadBalancer();
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, true);
loadBalancer.setConf(conf);
// NOTE the size is normally set in setClusterStatus, for test purpose, we set it manually
// Tables: hbase:namespace, table1, table2
// Functions: costFunctions, overall
String[] functionNames = loadBalancer.getCostFunctionNames();
loadBalancer.updateMetricsSize(3 * (functionNames.length + 1));
// table 1
TableName tableName = TableName.valueOf(TABLE_NAME_1);
Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(mockCluster_pertable_1);
loadBalancer.balanceCluster(tableName, clusterState);
// table 2
tableName = TableName.valueOf(TABLE_NAME_2);
clusterState = mockClusterServers(mockCluster_pertable_2);
loadBalancer.balanceCluster(tableName, clusterState);
// table hbase:namespace
tableName = TableName.valueOf(TABLE_NAME_NAMESPACE);
clusterState = mockClusterServers(mockCluster_pertable_namespace);
loadBalancer.balanceCluster(tableName, clusterState);
String[] tableNames = new String[] { TABLE_NAME_1, TABLE_NAME_2, TABLE_NAME_NAMESPACE };
Set<String> jmxMetrics = readJmxMetrics();
Set<String> expectedMetrics = getExpectedJmxMetrics(tableNames, functionNames);
// printMetrics(jmxMetrics, "existing metrics in per-table mode");
// printMetrics(expectedMetrics, "expected metrics in per-table mode");
// assert that every expected is in the JMX
for (String expected : expectedMetrics) {
assertTrue("Metric " + expected + " can not be found in JMX in per-table mode.",
jmxMetrics.contains(expected));
}
}
/**
* Read the attributes from Hadoop->HBase->Master->Balancer in JMX
*/
private Set<String> readJmxMetrics() {
JMXConnector connector = null;
try {
connector =
JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort));
MBeanServerConnection mb = connector.getMBeanServerConnection();
Hashtable<String, String> pairs = new Hashtable<>();
pairs.put("service", "HBase");
pairs.put("name", "Master");
pairs.put("sub", "Balancer");
ObjectName target = new ObjectName("Hadoop", pairs);
MBeanInfo beanInfo = mb.getMBeanInfo(target);
Set<String> existingAttrs = new HashSet<String>();
for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
existingAttrs.add(attrInfo.getName());
}
return existingAttrs;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connector != null) {
try {
connector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
return null;
}
/**
* Given the tables and functions, return metrics names that should exist in JMX
*/
private Set<String> getExpectedJmxMetrics(String[] tableNames, String[] functionNames) {
Set<String> ret = new HashSet<String>();
for (String tableName : tableNames) {
ret.add(StochasticLoadBalancer.composeAttributeName(tableName, "Overall"));
for (String functionName : functionNames) {
String metricsName = StochasticLoadBalancer.composeAttributeName(tableName, functionName);
ret.add(metricsName);
}
}
return ret;
}
private static void printMetrics(Set<String> metrics, String info) {
if (null != info) LOG.info("++++ ------ " + info + " ------");
LOG.info("++++ metrics count = " + metrics.size());
for (String str : metrics) {
LOG.info(" ++++ " + str);
}
}
}