From b73cacfd8dcb0b4e378e119704286b8a79acf552 Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Fri, 29 Mar 2013 19:11:24 +0000 Subject: [PATCH] HBASE-8209. Improve LoadTest extensibility git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1462611 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/HBaseTestingUtility.java | 75 ++++++++++----- .../hbase/client/TestHTableMultiplexer.java | 2 +- .../coprocessor/TestCoprocessorEndpoint.java | 2 +- .../hbase/coprocessor/TestMasterObserver.java | 2 +- ...onServerCoprocessorExceptionWithAbort.java | 2 +- ...nServerCoprocessorExceptionWithRemove.java | 2 +- .../hbase/master/TestMasterTransitions.java | 5 +- .../regionserver/wal/TestHLogFiltering.java | 2 +- .../hbase/util/MultiThreadedAction.java | 8 ++ .../hbase/util/MultiThreadedWriter.java | 92 +++++++++---------- .../util/TestMiniClusterLoadSequential.java | 34 +++++-- 11 files changed, 142 insertions(+), 84 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 013e7a3f563..5eb47b0d643 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2164,29 +2164,45 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param countOfRegions How many regions in .META. * @throws IOException */ - public void waitUntilAllRegionsAssigned(final int countOfRegions) + public void waitUntilAllRegionsAssigned(final byte[] tableName, final int countOfRegions) throws IOException { + int retries = 30; // We may wait up to 30 seconds + int rows = 0; HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME); - while (true) { - int rows = 0; - Scan scan = new Scan(); - scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = - r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - if (b == null || b.length <= 0) { + try { + do { + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + try { + for (Result r = null; (r = s.next()) != null;) { + byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + HRegionInfo hri = HRegionInfo.parseFromOrNull(b); + if (hri != null && Bytes.equals(hri.getTableName(), tableName)) { + b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + if (b == null || b.length <= 0) { + continue; + } + rows++; + } + } + } finally { + s.close(); + } + // If I get to here and all rows have a Server, then all have been assigned. + if (rows == countOfRegions) { break; } - rows++; - } - s.close(); - // If I get to here and all rows have a Server, then all have been assigned. - if (rows == countOfRegions) { - break; - } - LOG.info("Found=" + rows); - Threads.sleep(200); + LOG.info("Found=" + rows); + Threads.sleep(1000); + } while (--retries > 0); + } finally { + meta.close(); + } + if (rows != countOfRegions) { + throw new IOException("Timed out waiting for " + countOfRegions + " regions of " + + Bytes.toStringBinary(tableName) + " to come online"); } } @@ -2485,12 +2501,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); hcd.setDataBlockEncoding(dataBlockEncoding); hcd.setCompressionType(compression); - desc.addFamily(hcd); + return createPreSplitLoadTestTable(conf, desc, hcd); + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + HTableDescriptor desc, HColumnDescriptor hcd) throws IOException { + if (!desc.hasFamily(hcd.getName())) { + desc.addFamily(hcd); + } int totalNumberOfRegions = 0; + HBaseAdmin admin = new HBaseAdmin(conf); try { - HBaseAdmin admin = new HBaseAdmin(conf); - // create a table a pre-splits regions. // The number of splits is set as: // region servers * regions per region server). @@ -2513,8 +2540,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { LOG.error("Master not running", e); throw new IOException(e); } catch (TableExistsException e) { - LOG.warn("Table " + Bytes.toStringBinary(tableName) + + LOG.warn("Table " + Bytes.toStringBinary(desc.getName()) + " already exists, continuing"); + } finally { + admin.close(); } return totalNumberOfRegions; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java index c2deb56c37a..a801a02de28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java @@ -77,7 +77,7 @@ public class TestHTableMultiplexer { HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - TEST_UTIL.waitUntilAllRegionsAssigned(NUM_REGIONS); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE, NUM_REGIONS); byte[][] startRows = ht.getStartKeys(); byte[][] endRows = ht.getEndKeys(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java index d4f051e403d..481029b3bf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java @@ -91,7 +91,7 @@ public class TestCoprocessorEndpoint { HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); - util.waitUntilAllRegionsAssigned(3); + util.waitUntilAllRegionsAssigned(TEST_TABLE, 3); admin.close(); HTable table = new HTable(conf, TEST_TABLE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java index 4673ba1c805..d12e5926537 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java @@ -1110,7 +1110,7 @@ public class TestMasterObserver { try { int countOfRegions = UTIL.createMultiRegions(table, TEST_FAMILY); - UTIL.waitUntilAllRegionsAssigned(countOfRegions); + UTIL.waitUntilAllRegionsAssigned(TEST_TABLE, countOfRegions); NavigableMap regions = table.getRegionLocations(); Map.Entry firstGoodPair = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java index 653d45d8e77..16da09a7fd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java @@ -75,7 +75,7 @@ public class TestRegionServerCoprocessorExceptionWithAbort { byte[] TEST_FAMILY = Bytes.toBytes("aaa"); HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY); - TEST_UTIL.waitUntilAllRegionsAssigned(TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); + TEST_UTIL.waitUntilAllRegionsAssigned(TEST_TABLE, TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); // Note which regionServer will abort (after put is attempted). final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithRemove.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithRemove.java index 0534d3ab44d..172c28a5235 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithRemove.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithRemove.java @@ -91,7 +91,7 @@ public class TestRegionServerCoprocessorExceptionWithRemove { byte[] TEST_FAMILY = Bytes.toBytes("aaa"); HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY); - TEST_UTIL.waitUntilAllRegionsAssigned( + TEST_UTIL.waitUntilAllRegionsAssigned(TEST_TABLE, TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); // Note which regionServer that should survive the buggy coprocessor's // prePut(). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java index 4b25ae784a2..90620697e5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java @@ -59,11 +59,12 @@ public class TestMasterTransitions { @BeforeClass public static void beforeAllTests() throws Exception { TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); TEST_UTIL.startMiniCluster(2); + byte[] tableName = Bytes.toBytes(TABLENAME); // Create a table of three families. This will assign a region. - TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); + TEST_UTIL.createTable(tableName, FAMILIES); HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); - TEST_UTIL.waitUntilAllRegionsAssigned(countOfRegions); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName, countOfRegions); addToEachStartKey(countOfRegions); t.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java index 3c06fab5a78..c170146d524 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java @@ -102,7 +102,7 @@ public class TestHLogFiltering { table.flushCommits(); } } - TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME, NUM_RS); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index c988eca2e97..72f7a90877d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -268,6 +268,14 @@ public abstract class MultiThreadedAction { sb.append(v); } + protected static void appendToStatus(StringBuilder sb, String desc, + String v) { + sb.append(", "); + sb.append(desc); + sb.append("="); + sb.append(v); + } + /** * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}. * Does not verify cf/column integrity. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index 7d0da379c21..4fa8f7418dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -142,7 +142,7 @@ public class MultiThreadedWriter extends MultiThreadedAction { put.add(cf, column, value); ++columnCount; if (!isMultiPut) { - insert(put, rowKeyBase); + insert(table, put, rowKeyBase); numCols.addAndGet(1); put = new Put(rowKey); } @@ -152,7 +152,7 @@ public class MultiThreadedWriter extends MultiThreadedAction { if (verbose) { LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns"); } - insert(put, rowKeyBase); + insert(table, put, rowKeyBase); numCols.addAndGet(columnCount); } if (trackInsertedKeys) { @@ -168,53 +168,53 @@ public class MultiThreadedWriter extends MultiThreadedAction { numThreadsWorking.decrementAndGet(); } } + } - public void insert(Put put, long keyBase) { - try { - long start = System.currentTimeMillis(); - table.put(put); - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(keyBase); - String exceptionInfo; - if (e instanceof RetriesExhaustedWithDetailsException) { - RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; - exceptionInfo = aggEx.getExhaustiveDescription(); - } else { - StringWriter stackWriter = new StringWriter(); - PrintWriter pw = new PrintWriter(stackWriter); - e.printStackTrace(pw); - pw.flush(); - exceptionInfo = StringUtils.stringifyException(e); - } - LOG.error("Failed to insert: " + keyBase + "; region information: " - + getRegionDebugInfoSafe(put.getRow()) + "; errors: " - + exceptionInfo); + public void insert(HTable table, Put put, long keyBase) { + try { + long start = System.currentTimeMillis(); + table.put(put); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + "; region information: " + + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } + } + + private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) { + HRegionLocation cached = null, real = null; + try { + cached = table.getRegionLocation(rowKey, false); + real = table.getRegionLocation(rowKey, true); + } catch (Throwable t) { + // Cannot obtain region information for another catch block - too bad! + } + String result = "no information can be obtained"; + if (cached != null) { + result = "cached: " + cached.toString(); + } + if (real != null) { + if (real.equals(cached)) { + result += "; cache is up to date"; + } else { + result = (cached != null) ? (result + "; ") : ""; + result += "real: " + real.toString(); } } - - private String getRegionDebugInfoSafe(byte[] rowKey) { - HRegionLocation cached = null, real = null; - try { - cached = table.getRegionLocation(rowKey, false); - real = table.getRegionLocation(rowKey, true); - } catch (Throwable t) { - // Cannot obtain region information for another catch block - too bad! - } - String result = "no information can be obtained"; - if (cached != null) { - result = "cached: " + cached.toString(); - } - if (real != null) { - if (real.equals(cached)) { - result += "; cache is up to date"; - } else { - result = (cached != null) ? (result + "; ") : ""; - result += "real: " + real.toString(); - } - } - return result; - } + return result; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java index a7f5eb7da2a..414c18b36f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.exceptions.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -106,6 +107,19 @@ public class TestMiniClusterLoadSequential { TEST_UTIL.shutdownMiniCluster(); } + protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen, + Configuration conf, byte[] tableName, double verifyPercent) { + MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + return reader; + } + + protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen, + Configuration conf, byte[] tableName) { + MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName); + writer.setMultiPut(isMultiPut); + return writer; + } + @Test(timeout=TIMEOUT_MS) public void loadTest() throws Exception { prepareForLoadTest(); @@ -124,6 +138,12 @@ public class TestMiniClusterLoadSequential { assertEquals(numKeys, readerThreads.getNumKeysVerified()); } + protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd) + throws IOException { + int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd); + TEST_UTIL.waitUntilAllRegionsAssigned(htd.getName(), numRegions); + } + protected void prepareForLoadTest() throws IOException { LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding + ", isMultiPut=" + isMultiPut); @@ -135,15 +155,15 @@ public class TestMiniClusterLoadSequential { } admin.close(); - int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf, - TABLE, CF, compression, dataBlockEncoding); - - TEST_UTIL.waitUntilAllRegionsAssigned(numRegions); + HTableDescriptor htd = new HTableDescriptor(TABLE); + HColumnDescriptor hcd = new HColumnDescriptor(CF) + .setCompressionType(compression) + .setDataBlockEncoding(dataBlockEncoding); + createPreSplitLoadTestTable(htd, hcd); LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); - writerThreads = new MultiThreadedWriter(dataGen, conf, TABLE); - writerThreads.setMultiPut(isMultiPut); - readerThreads = new MultiThreadedReader(dataGen, conf, TABLE, 100); + writerThreads = prepareWriterThreads(dataGen, conf, TABLE); + readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100); } protected int numKeys() {