HBASE-22637 fix flaky test in TestMetaTableMetrics

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
Mate Szalay-Beko 2019-07-03 13:57:16 +02:00 committed by Peter Somogyi
parent 7828d6a106
commit a09c63ef26

View File

@ -11,17 +11,18 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -37,13 +38,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.JMXListener; import org.apache.hadoop.hbase.JMXListener;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.hamcrest.CustomTypeSafeMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.core.AllOf;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -63,18 +67,20 @@ public class TestMetaTableMetrics {
private static final byte[] QUALIFIER = Bytes.toBytes("q"); private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final int NUM_ROWS = 5; private static final int NUM_ROWS = 5;
private static final String value = "foo"; private static final String value = "foo";
private static Configuration conf = null; private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_";
private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES =
Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate");
private static int connectorPort = 61120; private static int connectorPort = 61120;
final byte[] cf = Bytes.toBytes("info"); private final byte[] cf = Bytes.toBytes("info");
final byte[] col = Bytes.toBytes("any"); private final byte[] col = Bytes.toBytes("any");
byte[] tablename; private byte[] tablename;
final int nthreads = 20; private final int nthreads = 20;
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
// Set system coprocessor so it can be applied to meta regions // Set system coprocessor so it can be applied to meta regions
UTIL.getConfiguration().set("hbase.coprocessor.region.classes", UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
MetaTableMetrics.class.getName()); MetaTableMetrics.class.getName());
@ -88,11 +94,9 @@ public class TestMetaTableMetrics {
try { try {
conf.setInt("regionserver.rmi.registry.port", connectorPort); conf.setInt("regionserver.rmi.registry.port", connectorPort);
UTIL.startMiniCluster(1); UTIL.startMiniCluster(1);
UTIL.createTable(NAME1, new byte[][]{FAMILY});
LOG.error("util to string" + UTIL.toString());
break; break;
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Encountered exception when starting cluster. Trying port " + connectorPort, e); LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e);
try { try {
// this is to avoid "IllegalStateException: A mini-cluster is already running" // this is to avoid "IllegalStateException: A mini-cluster is already running"
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
@ -108,7 +112,73 @@ public class TestMetaTableMetrics {
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
private void writeData(Table t) throws IOException { // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single
// client: 9 metrics
// are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one
// table, there should
// be 5 MetaTable_table_<TableName>_request attributes, such as:
// - MetaTable_table_TestExampleMetaTableMetricsOne_request_count
// - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
// - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
// - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
// - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
@Test
public void testMetaTableMetricsInJmx() throws Exception {
UTIL.createTable(NAME1, new byte[][]{FAMILY});
writeData(NAME1);
readingSingleRowFromTheMetaTable();
UTIL.deleteTable(NAME1);
UTIL.waitFor(30000, 2000, true, new Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
Map<String, Double> jmxMetrics = readMetaTableJmxMetrics();
boolean allMetricsFound = AllOf.allOf(
containsPositiveJmxAttributesFor("MetaTable_get_request"),
containsPositiveJmxAttributesFor("MetaTable_put_request"),
containsPositiveJmxAttributesFor("MetaTable_delete_request"),
containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"),
containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"),
containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"),
containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"),
containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"),
containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request")
).matches(jmxMetrics);
if (allMetricsFound) {
LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics);
} else {
LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics);
}
return allMetricsFound;
}
});
}
@Test
public void testConcurrentAccess() {
try {
tablename = Bytes.toBytes("hbase:meta");
int numRows = 3000;
int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
putData(numRows);
Thread.sleep(2000);
int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
getData(numRows);
} catch (InterruptedException e) {
LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage());
fail();
} catch (IOException e) {
LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage());
fail();
}
}
private void writeData(TableName tableName) throws IOException {
try (Table t = UTIL.getConnection().getTable(tableName)) {
List<Put> puts = new ArrayList<>(NUM_ROWS); List<Put> puts = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) { for (int i = 0; i < NUM_ROWS; i++) {
Put p = new Put(Bytes.toBytes(i + 1)); Put p = new Put(Bytes.toBytes(i + 1));
@ -117,26 +187,39 @@ public class TestMetaTableMetrics {
} }
t.put(puts); t.put(puts);
} }
}
private Set<String> readJmxMetricsWithRetry() throws IOException { private void readingSingleRowFromTheMetaTable() throws IOException {
final int count = 0; TableName metaTableName = TableName.valueOf(Bytes.toBytes("hbase:meta"));
for (int i = 0; i < 10; i++) { try (Table metaTable = UTIL.getConnection().getTable(metaTableName)) {
Set<String> metrics = readJmxMetrics(); Get get = new Get(Bytes.toBytes(1));
if (metrics != null) { metaTable.get(get);
return metrics;
} }
LOG.warn("Failed to get jmxmetrics... sleeping, retrying; " + i + " of " + count + " times");
Threads.sleep(1000);
} }
return null;
private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) {
return new CustomTypeSafeMatcher<Map<String, Double>>(
"failed to find all the 5 positive JMX attributes for: " + regexp) {
@Override
protected boolean matchesSafely(final Map<String, Double> values) {
for (String key : values.keySet()) {
for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) {
if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) {
return true;
}
}
}
return false;
}
};
} }
/** /**
* Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX
* @throws IOException when fails to retrieve jmx metrics. * @throws IOException when fails to retrieve jmx metrics.
*/ */
// this method comes from this class: TestStochasticBalancerJmxMetrics with minor modifications. private Map<String, Double> readMetaTableJmxMetrics() throws IOException {
private Set<String> readJmxMetrics() throws IOException {
JMXConnector connector = null; JMXConnector connector = null;
ObjectName target = null; ObjectName target = null;
MBeanServerConnection mb = null; MBeanServerConnection mb = null;
@ -150,26 +233,30 @@ public class TestMetaTableMetrics {
pairs.put("service", "HBase"); pairs.put("service", "HBase");
pairs.put("name", "RegionServer"); pairs.put("name", "RegionServer");
pairs.put("sub", pairs.put("sub",
"Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor" "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics");
+ ".MetaTableMetrics");
target = new ObjectName("Hadoop", pairs); target = new ObjectName("Hadoop", pairs);
MBeanInfo beanInfo = mb.getMBeanInfo(target); MBeanInfo beanInfo = mb.getMBeanInfo(target);
Set<String> existingAttrs = new HashSet<>(); Map<String, Double> existingAttrs = new HashMap<>();
for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) { for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
existingAttrs.add(attrInfo.getName()); Object value = mb.getAttribute(target, attrInfo.getName());
if (attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX)
&& value instanceof Number) {
existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString()));
} }
}
LOG.info("MBean Found: {}", target);
return existingAttrs; return existingAttrs;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to get bean." + target, e); LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e);
if (mb != null) { if (mb != null) {
Set<ObjectInstance> instances = mb.queryMBeans(null, null); Set<ObjectInstance> instances = mb.queryMBeans(null, null);
Iterator<ObjectInstance> iterator = instances.iterator(); Iterator<ObjectInstance> iterator = instances.iterator();
LOG.warn("MBean Found:"); LOG.debug("All the MBeans we found:");
while (iterator.hasNext()) { while (iterator.hasNext()) {
ObjectInstance instance = iterator.next(); ObjectInstance instance = iterator.next();
LOG.warn("Class Name: " + instance.getClassName()); LOG.debug("Class and object name: {} [{}]", instance.getClassName(),
LOG.warn("Object Name: " + instance.getObjectName()); instance.getObjectName());
} }
} }
} finally { } finally {
@ -181,75 +268,11 @@ public class TestMetaTableMetrics {
} }
} }
} }
return null; return Collections.emptyMap();
} }
// verifies meta table metrics exist from jmx private void putData(int nrows) throws InterruptedException {
// for one table, there should be 5 MetaTable_table_<TableName> metrics. LOG.info("Putting {} rows in hbase:meta", nrows);
// such as:
// [Time-limited test] example.TestMetaTableMetrics(204): ==
// MetaTable_table_TestExampleMetaTableMetricsOne_request_count
// [Time-limited test] example.TestMetaTableMetrics(204): ==
// MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
// [Time-limited test] example.TestMetaTableMetrics(204): ==
// MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
// [Time-limited test] example.TestMetaTableMetrics(204): ==
// MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
// [Time-limited test] example.TestMetaTableMetrics(204): ==
// MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
@Test
public void test() throws IOException, InterruptedException {
try (Table t = UTIL.getConnection().getTable(NAME1)) {
writeData(t);
// Flush the data
UTIL.flush(NAME1);
// Issue a compaction
UTIL.compact(NAME1, true);
Thread.sleep(2000);
}
Set<String> jmxMetrics = readJmxMetricsWithRetry();
assertNotNull(jmxMetrics);
long name1TableMetricsCount = 0;
for(String metric : jmxMetrics) {
if (metric.contains("MetaTable_table_" + NAME1)){
name1TableMetricsCount++;
}
}
assertEquals(5L, name1TableMetricsCount);
String putWithClientMetricNameRegex = "MetaTable_client_.+_put_request.*";
long putWithClientMetricsCount = 0;
for(String metric : jmxMetrics) {
if(metric.matches(putWithClientMetricNameRegex)) {
putWithClientMetricsCount++;
}
}
assertEquals(5L, putWithClientMetricsCount);
}
@Test(timeout = 30000)
public void testConcurrentAccess() {
try {
tablename = Bytes.toBytes("hbase:meta");
int numRows = 3000;
int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
putData(numRows);
Thread.sleep(2000);
int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
getData(numRows);
} catch (InterruptedException e) {
LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
fail();
} catch (IOException e) {
LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
fail();
}
}
public void putData(int nrows) throws InterruptedException {
LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
Thread[] threads = new Thread[nthreads]; Thread[] threads = new Thread[nthreads];
for (int i = 1; i <= nthreads; i++) { for (int i = 1; i <= nthreads; i++) {
threads[i - 1] = new PutThread(1, nrows); threads[i - 1] = new PutThread(1, nrows);
@ -257,8 +280,8 @@ public class TestMetaTableMetrics {
startThreadsAndWaitToJoin(threads); startThreadsAndWaitToJoin(threads);
} }
public void getData(int nrows) throws InterruptedException { private void getData(int nrows) throws InterruptedException {
LOG.info(String.format("Getting %d rows from hbase:meta", nrows)); LOG.info("Getting {} rows from hbase:meta", nrows);
Thread[] threads = new Thread[nthreads]; Thread[] threads = new Thread[nthreads];
for (int i = 1; i <= nthreads; i++) { for (int i = 1; i <= nthreads; i++) {
threads[i - 1] = new GetThread(1, nrows); threads[i - 1] = new GetThread(1, nrows);
@ -275,11 +298,11 @@ public class TestMetaTableMetrics {
} }
} }
class PutThread extends Thread { private class PutThread extends Thread {
int start; int start;
int end; int end;
public PutThread(int start, int end) { PutThread(int start, int end) {
this.start = start; this.start = start;
this.end = end; this.end = end;
} }
@ -293,16 +316,16 @@ public class TestMetaTableMetrics {
table.put(p); table.put(p);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.info("Caught IOException while PutThread operation: " + e.getMessage()); LOG.warn("Caught IOException while PutThread operation", e);
} }
} }
} }
class GetThread extends Thread { private class GetThread extends Thread {
int start; int start;
int end; int end;
public GetThread(int start, int end) { GetThread(int start, int end) {
this.start = start; this.start = start;
this.end = end; this.end = end;
} }
@ -315,7 +338,7 @@ public class TestMetaTableMetrics {
table.get(get); table.get(get);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.info("Caught IOException while GetThread operation: " + e.getMessage()); LOG.warn("Caught IOException while GetThread operation", e);
} }
} }
} }