HBASE-8209. Improve LoadTest extensibility

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1462611 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2013-03-29 19:11:24 +00:00
parent 00f7bb46c5
commit b73cacfd8d
11 changed files with 142 additions and 84 deletions

View File

@ -2164,29 +2164,45 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @param countOfRegions How many regions in .META. * @param countOfRegions How many regions in .META.
* @throws IOException * @throws IOException
*/ */
public void waitUntilAllRegionsAssigned(final int countOfRegions) public void waitUntilAllRegionsAssigned(final byte[] tableName, final int countOfRegions)
throws IOException { throws IOException {
int retries = 30; // We may wait up to 30 seconds
int rows = 0;
HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME); HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME);
while (true) { try {
int rows = 0; do {
Scan scan = new Scan(); Scan scan = new Scan();
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
ResultScanner s = meta.getScanner(scan); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
for (Result r = null; (r = s.next()) != null;) { ResultScanner s = meta.getScanner(scan);
byte [] b = try {
r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); for (Result r = null; (r = s.next()) != null;) {
if (b == null || b.length <= 0) { byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
HRegionInfo hri = HRegionInfo.parseFromOrNull(b);
if (hri != null && Bytes.equals(hri.getTableName(), tableName)) {
b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
if (b == null || b.length <= 0) {
continue;
}
rows++;
}
}
} finally {
s.close();
}
// If I get to here and all rows have a Server, then all have been assigned.
if (rows == countOfRegions) {
break; break;
} }
rows++; LOG.info("Found=" + rows);
} Threads.sleep(1000);
s.close(); } while (--retries > 0);
// If I get to here and all rows have a Server, then all have been assigned. } finally {
if (rows == countOfRegions) { meta.close();
break; }
} if (rows != countOfRegions) {
LOG.info("Found=" + rows); throw new IOException("Timed out waiting for " + countOfRegions + " regions of " +
Threads.sleep(200); Bytes.toStringBinary(tableName) + " to come online");
} }
} }
@ -2485,12 +2501,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
hcd.setDataBlockEncoding(dataBlockEncoding); hcd.setDataBlockEncoding(dataBlockEncoding);
hcd.setCompressionType(compression); hcd.setCompressionType(compression);
desc.addFamily(hcd); return createPreSplitLoadTestTable(conf, desc, hcd);
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
if (!desc.hasFamily(hcd.getName())) {
desc.addFamily(hcd);
}
int totalNumberOfRegions = 0; int totalNumberOfRegions = 0;
HBaseAdmin admin = new HBaseAdmin(conf);
try { try {
HBaseAdmin admin = new HBaseAdmin(conf);
// create a table a pre-splits regions. // create a table a pre-splits regions.
// The number of splits is set as: // The number of splits is set as:
// region servers * regions per region server). // region servers * regions per region server).
@ -2513,8 +2540,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
LOG.error("Master not running", e); LOG.error("Master not running", e);
throw new IOException(e); throw new IOException(e);
} catch (TableExistsException e) { } catch (TableExistsException e) {
LOG.warn("Table " + Bytes.toStringBinary(tableName) + LOG.warn("Table " + Bytes.toStringBinary(desc.getName()) +
" already exists, continuing"); " already exists, continuing");
} finally {
admin.close();
} }
return totalNumberOfRegions; return totalNumberOfRegions;
} }

View File

@ -77,7 +77,7 @@ public class TestHTableMultiplexer {
HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION, HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
TEST_UTIL.waitUntilAllRegionsAssigned(NUM_REGIONS); TEST_UTIL.waitUntilAllRegionsAssigned(TABLE, NUM_REGIONS);
byte[][] startRows = ht.getStartKeys(); byte[][] startRows = ht.getStartKeys();
byte[][] endRows = ht.getEndKeys(); byte[][] endRows = ht.getEndKeys();

View File

@ -91,7 +91,7 @@ public class TestCoprocessorEndpoint {
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
util.waitUntilAllRegionsAssigned(3); util.waitUntilAllRegionsAssigned(TEST_TABLE, 3);
admin.close(); admin.close();
HTable table = new HTable(conf, TEST_TABLE); HTable table = new HTable(conf, TEST_TABLE);

View File

@ -1110,7 +1110,7 @@ public class TestMasterObserver {
try { try {
int countOfRegions = UTIL.createMultiRegions(table, TEST_FAMILY); int countOfRegions = UTIL.createMultiRegions(table, TEST_FAMILY);
UTIL.waitUntilAllRegionsAssigned(countOfRegions); UTIL.waitUntilAllRegionsAssigned(TEST_TABLE, countOfRegions);
NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations(); NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations();
Map.Entry<HRegionInfo, ServerName> firstGoodPair = null; Map.Entry<HRegionInfo, ServerName> firstGoodPair = null;

View File

@ -75,7 +75,7 @@ public class TestRegionServerCoprocessorExceptionWithAbort {
byte[] TEST_FAMILY = Bytes.toBytes("aaa"); byte[] TEST_FAMILY = Bytes.toBytes("aaa");
HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY); HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
TEST_UTIL.waitUntilAllRegionsAssigned(TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); TEST_UTIL.waitUntilAllRegionsAssigned(TEST_TABLE, TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
// Note which regionServer will abort (after put is attempted). // Note which regionServer will abort (after put is attempted).
final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE); final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);

View File

@ -91,7 +91,7 @@ public class TestRegionServerCoprocessorExceptionWithRemove {
byte[] TEST_FAMILY = Bytes.toBytes("aaa"); byte[] TEST_FAMILY = Bytes.toBytes("aaa");
HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY); HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
TEST_UTIL.waitUntilAllRegionsAssigned( TEST_UTIL.waitUntilAllRegionsAssigned(TEST_TABLE,
TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
// Note which regionServer that should survive the buggy coprocessor's // Note which regionServer that should survive the buggy coprocessor's
// prePut(). // prePut().

View File

@ -59,11 +59,12 @@ public class TestMasterTransitions {
@BeforeClass public static void beforeAllTests() throws Exception { @BeforeClass public static void beforeAllTests() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
byte[] tableName = Bytes.toBytes(TABLENAME);
// Create a table of three families. This will assign a region. // Create a table of three families. This will assign a region.
TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); TEST_UTIL.createTable(tableName, FAMILIES);
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
TEST_UTIL.waitUntilAllRegionsAssigned(countOfRegions); TEST_UTIL.waitUntilAllRegionsAssigned(tableName, countOfRegions);
addToEachStartKey(countOfRegions); addToEachStartKey(countOfRegions);
t.close(); t.close();
} }

View File

@ -102,7 +102,7 @@ public class TestHLogFiltering {
table.flushCommits(); table.flushCommits();
} }
} }
TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS); TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME, NUM_RS);
} }
@Test @Test

View File

@ -268,6 +268,14 @@ public abstract class MultiThreadedAction {
sb.append(v); sb.append(v);
} }
protected static void appendToStatus(StringBuilder sb, String desc,
String v) {
sb.append(", ");
sb.append(desc);
sb.append("=");
sb.append(v);
}
/** /**
* See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
* Does not verify cf/column integrity. * Does not verify cf/column integrity.

View File

@ -142,7 +142,7 @@ public class MultiThreadedWriter extends MultiThreadedAction {
put.add(cf, column, value); put.add(cf, column, value);
++columnCount; ++columnCount;
if (!isMultiPut) { if (!isMultiPut) {
insert(put, rowKeyBase); insert(table, put, rowKeyBase);
numCols.addAndGet(1); numCols.addAndGet(1);
put = new Put(rowKey); put = new Put(rowKey);
} }
@ -152,7 +152,7 @@ public class MultiThreadedWriter extends MultiThreadedAction {
if (verbose) { if (verbose) {
LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns"); LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
} }
insert(put, rowKeyBase); insert(table, put, rowKeyBase);
numCols.addAndGet(columnCount); numCols.addAndGet(columnCount);
} }
if (trackInsertedKeys) { if (trackInsertedKeys) {
@ -168,53 +168,53 @@ public class MultiThreadedWriter extends MultiThreadedAction {
numThreadsWorking.decrementAndGet(); numThreadsWorking.decrementAndGet();
} }
} }
}
public void insert(Put put, long keyBase) { public void insert(HTable table, Put put, long keyBase) {
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
table.put(put); table.put(put);
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) { } catch (IOException e) {
failedKeySet.add(keyBase); failedKeySet.add(keyBase);
String exceptionInfo; String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) { if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription(); exceptionInfo = aggEx.getExhaustiveDescription();
} else { } else {
StringWriter stackWriter = new StringWriter(); StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter); PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw); e.printStackTrace(pw);
pw.flush(); pw.flush();
exceptionInfo = StringUtils.stringifyException(e); exceptionInfo = StringUtils.stringifyException(e);
} }
LOG.error("Failed to insert: " + keyBase + "; region information: " LOG.error("Failed to insert: " + keyBase + "; region information: "
+ getRegionDebugInfoSafe(put.getRow()) + "; errors: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
+ exceptionInfo); + exceptionInfo);
}
}
private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null) {
if (real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
} }
} }
return result;
private String getRegionDebugInfoSafe(byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null) {
if (real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
} }
/** /**

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -106,6 +107,19 @@ public class TestMiniClusterLoadSequential {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
Configuration conf, byte[] tableName, double verifyPercent) {
MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
return reader;
}
protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
Configuration conf, byte[] tableName) {
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
writer.setMultiPut(isMultiPut);
return writer;
}
@Test(timeout=TIMEOUT_MS) @Test(timeout=TIMEOUT_MS)
public void loadTest() throws Exception { public void loadTest() throws Exception {
prepareForLoadTest(); prepareForLoadTest();
@ -124,6 +138,12 @@ public class TestMiniClusterLoadSequential {
assertEquals(numKeys, readerThreads.getNumKeysVerified()); assertEquals(numKeys, readerThreads.getNumKeysVerified());
} }
protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd)
throws IOException {
int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd);
TEST_UTIL.waitUntilAllRegionsAssigned(htd.getName(), numRegions);
}
protected void prepareForLoadTest() throws IOException { protected void prepareForLoadTest() throws IOException {
LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding + LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding +
", isMultiPut=" + isMultiPut); ", isMultiPut=" + isMultiPut);
@ -135,15 +155,15 @@ public class TestMiniClusterLoadSequential {
} }
admin.close(); admin.close();
int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf, HTableDescriptor htd = new HTableDescriptor(TABLE);
TABLE, CF, compression, dataBlockEncoding); HColumnDescriptor hcd = new HColumnDescriptor(CF)
.setCompressionType(compression)
TEST_UTIL.waitUntilAllRegionsAssigned(numRegions); .setDataBlockEncoding(dataBlockEncoding);
createPreSplitLoadTestTable(htd, hcd);
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE); writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
writerThreads.setMultiPut(isMultiPut); readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100);
} }
protected int numKeys() { protected int numKeys() {