From 59825ac875d90ae0083b992b72dc9a57f9c5941d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 29 Aug 2007 23:39:52 +0000 Subject: [PATCH] HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (localTestSingleRegionTable, localTestMultiRegionTable, verify): Added. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Javadoc for addContents and Loader interface and implementations. Methods have been made static so accessible w/o subclassing. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Guts of TestSplit has been moved here so other tests can have access to a multiregion table. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Bulk moved to MultiRegionTable utility class. Use this new class instead. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Added '@deprecated' javadoc. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Was throwing RuntimeException when a msgQueue.put was interrupted but this is a likely event on shutdown. Log a message instead. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Actually fix for HADOOP-1785... reverse test of row comparison. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@570983 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 8 +- src/java/org/apache/hadoop/hbase/HMaster.java | 7 +- src/java/org/apache/hadoop/hbase/HTable.java | 10 +- .../hadoop/hbase/mapred/TableInputFormat.java | 2 +- .../apache/hadoop/hbase/HBaseTestCase.java | 56 +++- .../apache/hadoop/hbase/MultiRegionTable.java | 257 ++++++++++++++++++ .../org/apache/hadoop/hbase/TestSplit.java | 205 +------------- .../hadoop/hbase/TestTableMapReduce.java | 216 ++++++++++----- 8 files changed, 479 insertions(+), 282 deletions(-) create mode 100644 src/test/org/apache/hadoop/hbase/MultiRegionTable.java diff --git a/CHANGES.txt b/CHANGES.txt index 09dc25cda5a..c593c21efe6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,23 +7,25 @@ Trunk (unreleased changes) NEW FEATURES HADOOP-1768 FS command using Hadoop FsShell operations - (Edward Yoon via Stack) + (Edward Yoon via Stack) OPTIMIZATIONS BUG FIXES HADOOP-1527 Region server won't start because logdir exists HADOOP-1723 If master asks region server to shut down, by-pass return of - shutdown message + shutdown message HADOOP-1729 Recent renaming or META tables breaks hbase shell HADOOP-1730 unexpected null value causes META scanner to exit (silently) HADOOP-1747 On a cluster, on restart, regions multiply assigned HADOOP-1776 Fix for sporadic compaction failures closing and moving - compaction result + compaction result HADOOP-1780 Regions are still being doubly assigned HADOOP-1797 Fix NPEs in MetaScanner constructor HADOOP-1799 Incorrect classpath in binary version of Hadoop HADOOP-1805 Region server hang on exit + HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug + (Ning Li via Stack) IMPROVEMENTS HADOOP-1737 Make HColumnDescriptor data publically members settable diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index af825fce4ed..60f14efef88 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -3014,13 +3014,16 @@ HMasterRegionInterface, Runnable { // NOTE: If the server was serving the root region, we cannot reassign it // here because the new server will start serving the root region before // the PendingServerShutdown operation has a chance to split the log file. - try { if (info != null) { msgQueue.put(new PendingServerShutdown(info)); } } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); + // continue. We used to throw a RuntimeException here but on exit + // this put is often interrupted. For now, just log these iterrupts + // rather than throw an exception + LOG.warn("MsgQueue.put was interrupted (If we are exiting, this msg " + + "can be ignored"); } } } diff --git a/src/java/org/apache/hadoop/hbase/HTable.java b/src/java/org/apache/hadoop/hbase/HTable.java index baf24f012c8..de259d2222e 100644 --- a/src/java/org/apache/hadoop/hbase/HTable.java +++ b/src/java/org/apache/hadoop/hbase/HTable.java @@ -445,7 +445,7 @@ public class HTable implements HConstants { * @param row name of row to be updated * @return lockid to be used in subsequent put, delete and commit calls * - * Deprecated. Batch operations are now the default. startBatchUpdate is now + * @deprecated Batch operations are now the default. startBatchUpdate is now * implemented by @see {@link #startUpdate(Text)} */ @Deprecated @@ -457,7 +457,7 @@ public class HTable implements HConstants { * Abort a batch mutation * @param lockid lock id returned by startBatchUpdate * - * Deprecated. Batch operations are now the default. abortBatch is now + * @deprecated Batch operations are now the default. abortBatch is now * implemented by @see {@link #abort(long)} */ @Deprecated @@ -471,7 +471,7 @@ public class HTable implements HConstants { * @param lockid lock id returned by startBatchUpdate * @throws IOException * - * Deprecated. Batch operations are now the default. commitBatch(long) is now + * @deprecated Batch operations are now the default. commitBatch(long) is now * implemented by @see {@link #commit(long)} */ @Deprecated @@ -486,7 +486,7 @@ public class HTable implements HConstants { * @param timestamp time to associate with all the changes * @throws IOException * - * Deprecated. Batch operations are now the default. commitBatch(long, long) + * @deprecated Batch operations are now the default. commitBatch(long, long) * is now implemented by @see {@link #commit(long, long)} */ @Deprecated @@ -622,7 +622,7 @@ public class HTable implements HConstants { * * @param lockid - lock id returned from startUpdate * - * Deprecated. Batch updates are now the default. Consequently this method + * @deprecated Batch updates are now the default. Consequently this method * does nothing. */ @Deprecated diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java index cb75ff184d9..e407570a950 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -144,7 +144,7 @@ implements InputFormat, JobConfigurable { if(hasMore) { if(m_endRow.getLength() > 0 && - (tKey.getRow().compareTo(m_endRow) < 0)) { + (tKey.getRow().compareTo(m_endRow) >= 0)) { hasMore = false; diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index e597eab9cbd..ac7567df7f9 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -103,7 +103,15 @@ public abstract class HBaseTestCase extends TestCase { return htd; } - protected void addContent(final HRegion r, final String column) + /** + * Add content to region r on the passed column + * column. + * Adds data of the from 'aaa', 'aab', etc where key and value are the same. + * @param r + * @param column + * @throws IOException + */ + protected static void addContent(final HRegion r, final String column) throws IOException { Text startKey = r.getRegionInfo().getStartKey(); Text endKey = r.getRegionInfo().getEndKey(); @@ -113,14 +121,32 @@ public abstract class HBaseTestCase extends TestCase { } addContent(new HRegionLoader(r), column, startKeyBytes, endKey); } - - protected void addContent(final Loader updater, final String column) + + /** + * Add content to region r on the passed column + * column. + * Adds data of the from 'aaa', 'aab', etc where key and value are the same. + * @param updater An instance of {@link Loader}. + * @param column + * @throws IOException + */ + protected static void addContent(final Loader updater, final String column) throws IOException { addContent(updater, column, new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null); } - - protected void addContent(final Loader updater, final String column, + + /** + * Add content to region r on the passed column + * column. + * Adds data of the from 'aaa', 'aab', etc where key and value are the same. + * @param updater An instance of {@link Loader}. + * @param column + * @param startKeyBytes Where to start the rows inserted + * @param endKey Where to stop inserting rows. + * @throws IOException + */ + protected static void addContent(final Loader updater, final String column, final byte [] startKeyBytes, final Text endKey) throws IOException { // Add rows of three characters. The first character starts with the @@ -156,14 +182,21 @@ public abstract class HBaseTestCase extends TestCase { } } - public interface Loader { + /** + * Interface used by the addContent methods so either a HTable or a HRegion + * can be passed to the methods. + */ + public static interface Loader { public long startBatchUpdate(final Text row) throws IOException; public void put(long lockid, Text column, byte val[]) throws IOException; public void commit(long lockid) throws IOException; public void abort(long lockid) throws IOException; } - public class HRegionLoader implements Loader { + /** + * A class that makes a {@link Loader} out of a {@link HRegion} + */ + public static class HRegionLoader implements Loader { final HRegion region; public HRegionLoader(final HRegion HRegion) { super(); @@ -182,8 +215,11 @@ public abstract class HBaseTestCase extends TestCase { return this.region.startUpdate(row); } } - - public class HTableLoader implements Loader { + + /** + * A class that makes a {@link Loader} out of a {@link HTable} + */ + public static class HTableLoader implements Loader { final HTable table; public HTableLoader(final HTable table) { super(); @@ -199,7 +235,7 @@ public abstract class HBaseTestCase extends TestCase { this.table.put(lockid, column, val); } public long startBatchUpdate(Text row) { - return this.table.startBatchUpdate(row); + return this.table.startUpdate(row); } } } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java new file mode 100644 index 00000000000..ce20946373a --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -0,0 +1,257 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ConcurrentModificationException; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Text; + +/** + * Utility class to build a table of multiple regions. + */ +public class MultiRegionTable extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(MultiRegionTable.class.getName()); + + /** + * Make a multi-region table. Presumption is that table already exists. + * Makes it multi-region by filling with data and provoking splits. + * Asserts parent region is cleaned up after its daughter splits release all + * references. + * @param conf + * @param cluster + * @param localFs + * @param tableName + * @param columnName + * @throws IOException + */ + public static void makeMultiRegionTable(Configuration conf, + MiniHBaseCluster cluster, FileSystem localFs, String tableName, + String columnName) + throws IOException { + // This size should make it so we always split using the addContent + // below. After adding all data, the first region is 1.3M. Should + // set max filesize to be <= 1M. + assertTrue(conf.getLong("hbase.hregion.max.filesize", + HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024); + + final int retries = 10; + Path d = cluster.regionThreads.get(0).getRegionServer().rootDir; + FileSystem fs = (cluster.getDFSCluster() == null) ? + localFs : cluster.getDFSCluster().getFileSystem(); + assertTrue(fs != null); + + // Get connection on the meta table and get count of rows. + HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + int count = count(meta, HConstants.COLUMN_FAMILY_STR); + HTable t = new HTable(conf, new Text(tableName)); + addContent(new HTableLoader(t), columnName); + // All is running in the one JVM so I should be able to get the single + // region instance and bring on a split. + HRegionInfo hri = + t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); + HRegion r = cluster.regionThreads.get(0).getRegionServer(). + onlineRegions.get(hri.getRegionName()); + // Flush will provoke a split next time the split-checker thread runs. + r.flushcache(false); + // Now, wait until split makes it into the meta table. + for (int i = 0; i < retries && + (count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // continue + } + } + int oldCount = count; + count = count(meta, HConstants.COLUMN_FAMILY_STR); + if (count <= oldCount) { + throw new IOException("Failed waiting on splits to show up"); + } + // Get info on the parent from the meta table. Pass in 'hri'. Its the + // region we have been dealing with up to this. Its the parent of the + // region split. + Map data = getSplitParentInfo(meta, hri); + HRegionInfo parent = + Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO)); + assertTrue(parent.isOffline()); + assertTrue(parent.isSplit()); + HRegionInfo splitA = + Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA)); + HRegionInfo splitB = + Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB)); + Path parentDir = HRegion.getRegionDir(d, parent.getRegionName()); + assertTrue(fs.exists(parentDir)); + LOG.info("Split happened. Parent is " + parent.getRegionName() + + " and daughters are " + splitA.getRegionName() + ", " + + splitB.getRegionName()); + // Recalibrate will cause us to wait on new regions' deployment + recalibrate(t, new Text(columnName), retries); + // Compact a region at a time so we can test case where one region has + // no references but the other still has some + compact(cluster, splitA); + // Wait till the parent only has reference to remaining split, one that + // still has references. + while (getSplitParentInfo(meta, parent).size() == 3) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // continue + } + } + LOG.info("Parent split returned " + + getSplitParentInfo(meta, parent).keySet().toString()); + // Call second split. + compact(cluster, splitB); + // Now wait until parent disappears. + LOG.info("Waiting on parent " + parent.getRegionName() + + " to disappear"); + for (int i = 0; i < retries && + getSplitParentInfo(meta, parent) != null; i++) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(getSplitParentInfo(meta, parent) == null); + // Assert cleaned up. + for (int i = 0; i < retries && fs.exists(parentDir); i++) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // continue + } + } + assertFalse(fs.exists(parentDir)); + } + + /* + * Count of rows in table for given column. + * @param t + * @param column + * @return + * @throws IOException + */ + private static int count(final HTable t, final String column) + throws IOException { + int size = 0; + Text [] cols = new Text[] {new Text(column)}; + HScannerInterface s = t.obtainScanner(cols, HConstants.EMPTY_START_ROW, + System.currentTimeMillis(), null); + try { + HStoreKey curKey = new HStoreKey(); + TreeMap curVals = new TreeMap(); + while(s.next(curKey, curVals)) { + size++; + } + return size; + } finally { + s.close(); + } + } + + /* + * @return Return row info for passed in region or null if not found in scan. + */ + private static Map getSplitParentInfo(final HTable t, + final HRegionInfo parent) + throws IOException { + HScannerInterface s = t.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, + HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null); + try { + HStoreKey curKey = new HStoreKey(); + TreeMap curVals = new TreeMap(); + while(s.next(curKey, curVals)) { + HRegionInfo hri = Writables. + getHRegionInfoOrNull(curVals.get(HConstants.COL_REGIONINFO)); + if (hri == null) { + continue; + } + if (hri.getRegionName().toString(). + equals(parent.getRegionName().toString())) { + return curVals; + } + } + return null; + } finally { + s.close(); + } + } + + /* + * Recalibrate passed in HTable. Run after change in region geography. + * Open a scanner on the table. This will force HTable to recalibrate + * and in doing so, will force us to wait until the new child regions + * come on-line (since they are no longer automatically served by the + * HRegionServer that was serving the parent. In this test they will + * end up on the same server (since there is only one), but we have to + * wait until the master assigns them. + * @param t + * @param retries + */ + private static void recalibrate(final HTable t, final Text column, + final int retries) throws IOException { + for (int i = 0; i < retries; i++) { + try { + HScannerInterface s = + t.obtainScanner(new Text[] {column}, HConstants.EMPTY_START_ROW); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + s.next(key, results); + break; + } finally { + s.close(); + } + } catch (NotServingRegionException x) { + System.out.println("it's alright"); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // continue + } + } + } + } + + /* + * Compact the passed in region r. + * @param cluster + * @param r + * @throws IOException + */ + private static void compact(final MiniHBaseCluster cluster, + final HRegionInfo r) + throws IOException { + LOG.info("Starting compaction"); + for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) { + SortedMap regions = + thread.getRegionServer().onlineRegions; + // Retry if ConcurrentModification... alternative of sync'ing is not + // worth it for sake of unit test. + for (int i = 0; i < 10; i++) { + try { + for (HRegion online: regions.values()) { + if (online.getRegionName().toString(). + equals(r.getRegionName().toString())) { + online.compactStores(); + } + } + break; + } catch (ConcurrentModificationException e) { + LOG.warn("Retrying because ..." + e.toString() + " -- one or " + + "two should be fine"); + continue; + } + } + } + } +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/TestSplit.java b/src/test/org/apache/hadoop/hbase/TestSplit.java index 4344d9c6105..77d85cf5716 100644 --- a/src/test/org/apache/hadoop/hbase/TestSplit.java +++ b/src/test/org/apache/hadoop/hbase/TestSplit.java @@ -20,16 +20,10 @@ package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.ConcurrentModificationException; -import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -148,158 +142,24 @@ public class TestSplit extends HBaseTestCase { /** * Test that a region is cleaned up after its daughter splits release all * references. - * @throws Exception + * @throws IOException */ - public void testSplitRegionIsDeleted() throws Exception { - final int retries = 10; + public void testSplitRegionIsDeleted() throws IOException { // Start up a hbase cluster MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true); - Path d = cluster.regionThreads.get(0).getRegionServer().rootDir; - FileSystem fs = (cluster.getDFSCluster() == null)? - this.localFs: - cluster.getDFSCluster().getFileSystem(); - HTable meta = null; - HTable t = null; try { // Create a table. HBaseAdmin admin = new HBaseAdmin(this.conf); admin.createTable(createTableDescriptor(getName())); - // Get connection on the meta table and get count of rows. - meta = new HTable(this.conf, HConstants.META_TABLE_NAME); - int count = count(meta, HConstants.COLUMN_FAMILY_STR); - t = new HTable(this.conf, new Text(getName())); - addContent(new HTableLoader(t), COLFAMILY_NAME3); - // All is running in the one JVM so I should be able to get the single - // region instance and bring on a split. - HRegionInfo hri = - t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); - HRegion r = cluster.regionThreads.get(0).getRegionServer(). - onlineRegions.get(hri.getRegionName()); - // Flush will provoke a split next time the split-checker thread runs. - r.flushcache(false); - // Now, wait until split makes it into the meta table. - for (int i = 0; i < retries && - (count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) { - Thread.sleep(5000); - } - int oldCount = count; - count = count(meta, HConstants.COLUMN_FAMILY_STR); - if (count <= oldCount) { - throw new IOException("Failed waiting on splits to show up"); - } - // Get info on the parent from the meta table. Pass in 'hri'. Its the - // region we have been dealing with up to this. Its the parent of the - // region split. - Map data = getSplitParentInfo(meta, hri); - HRegionInfo parent = - Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO)); - assertTrue(parent.isOffline()); - assertTrue(parent.isSplit()); - HRegionInfo splitA = - Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA)); - HRegionInfo splitB = - Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB)); - Path parentDir = HRegion.getRegionDir(d, parent.getRegionName()); - assertTrue(fs.exists(parentDir)); - LOG.info("Split happened. Parent is " + parent.getRegionName() + - " and daughters are " + splitA.getRegionName() + ", " + - splitB.getRegionName()); - // Recalibrate will cause us to wait on new regions' deployment - recalibrate(t, new Text(COLFAMILY_NAME3), retries); - // Compact a region at a time so we can test case where one region has - // no references but the other still has some - compact(cluster, splitA); - // Wait till the parent only has reference to remaining split, one that - // still has references. - while (getSplitParentInfo(meta, parent).size() == 3) { - Thread.sleep(5000); - } - LOG.info("Parent split returned " + - getSplitParentInfo(meta, parent).keySet().toString()); - // Call second split. - compact(cluster, splitB); - // Now wait until parent disappears. - LOG.info("Waiting on parent " + parent.getRegionName() + - " to disappear"); - for (int i = 0; i < retries && - getSplitParentInfo(meta, parent) != null; i++) { - Thread.sleep(5000); - } - assertTrue(getSplitParentInfo(meta, parent) == null); - // Assert cleaned up. - for (int i = 0; i < retries && fs.exists(parentDir); i++) { - Thread.sleep(5000); - } - assertFalse(fs.exists(parentDir)); + // This builds a multi-region table by splitting. It will assert + // the parent region gets cleaned-up. + MultiRegionTable.makeMultiRegionTable(conf, cluster, + this.localFs, getName(), COLFAMILY_NAME3); } finally { cluster.shutdown(); } } - /* - * Compact the passed in region r. - * @param cluster - * @param r - * @throws IOException - */ - private void compact(final MiniHBaseCluster cluster, final HRegionInfo r) - throws IOException { - LOG.info("Starting compaction"); - for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) { - SortedMap regions = - thread.getRegionServer().onlineRegions; - // Retry if ConcurrentModification... alternative of sync'ing is not - // worth it for sake of unit test. - for (int i = 0; i < 10; i++) { - try { - for (HRegion online: regions.values()) { - if (online.getRegionName().toString(). - equals(r.getRegionName().toString())) { - online.compactStores(); - } - } - break; - } catch (ConcurrentModificationException e) { - LOG.warn("Retrying because ..." + e.toString() + " -- one or " + - "two should be fine"); - continue; - } - } - } - } - - /* - * Recalibrate passed in HTable. Run after change in region geography. - * Open a scanner on the table. This will force HTable to recalibrate - * and in doing so, will force us to wait until the new child regions - * come on-line (since they are no longer automatically served by the - * HRegionServer that was serving the parent. In this test they will - * end up on the same server (since there is only one), but we have to - * wait until the master assigns them. - * @param t - * @param retries - */ - private void recalibrate(final HTable t, final Text column, - final int retries) - throws IOException, InterruptedException { - for (int i = 0; i < retries; i++) { - try { - HScannerInterface s = - t.obtainScanner(new Text[] {column}, HConstants.EMPTY_START_ROW); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - s.next(key, results); - break; - } finally { - s.close(); - } - } catch (NotServingRegionException x) { - Thread.sleep(5000); - } - } - } - private void assertGet(final HRegion r, final String family, final Text k) throws IOException { // Now I have k, get values out and assert they are as expected. @@ -312,59 +172,6 @@ public class TestSplit extends HBaseTestCase { } } - /* - * @return Return row info for passed in region or null if not found in scan. - */ - private Map getSplitParentInfo(final HTable t, - final HRegionInfo parent) - throws IOException { - HScannerInterface s = t.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, - HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null); - try { - HStoreKey curKey = new HStoreKey(); - TreeMap curVals = new TreeMap(); - while(s.next(curKey, curVals)) { - HRegionInfo hri = Writables. - getHRegionInfoOrNull(curVals.get(HConstants.COL_REGIONINFO)); - if (hri == null) { - continue; - } - if (hri.getRegionName().toString(). - equals(parent.getRegionName().toString())) { - return curVals; - } - } - return null; - } finally { - s.close(); - } - } - - /* - * Count of rows in table for given column. - * @param t - * @param column - * @return - * @throws IOException - */ - private int count(final HTable t, final String column) - throws IOException { - int size = 0; - Text [] cols = new Text[] {new Text(column)}; - HScannerInterface s = t.obtainScanner(cols, HConstants.EMPTY_START_ROW, - System.currentTimeMillis(), null); - try { - HStoreKey curKey = new HStoreKey(); - TreeMap curVals = new TreeMap(); - while(s.next(curKey, curVals)) { - size++; - } - return size; - } finally { - s.close(); - } - } - /* * Assert first value in the passed region is firstValue. * @param r diff --git a/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java index 62b3e10bc59..d20f8595ae6 100644 --- a/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java +++ b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java @@ -21,26 +21,21 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.Map; -import java.util.Random; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.dfs.MiniDFSCluster; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.io.Text; - import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.Reporter; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.MapWritable; - import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.hbase.mapred.TableOutputCollector; import org.apache.hadoop.hbase.mapred.IdentityTableReduce; @@ -49,15 +44,16 @@ import org.apache.hadoop.hbase.mapred.IdentityTableReduce; * Test Map/Reduce job over HBase tables */ public class TestTableMapReduce extends HBaseTestCase { - static final String TABLE_NAME = "test"; + private static final Log LOG = + LogFactory.getLog(TestTableMapReduce.class.getName()); + + static final String SINGLE_REGION_TABLE_NAME = "srtest"; + static final String MULTI_REGION_TABLE_NAME = "mrtest"; static final String INPUT_COLUMN = "contents:"; static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); static final String OUTPUT_COLUMN = "text:"; static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); - private Random rand; - private HTableDescriptor desc; - private MiniDFSCluster dfsCluster = null; private FileSystem fs; private Path dir; @@ -76,51 +72,23 @@ public class TestTableMapReduce extends HBaseTestCase { @Override public void setUp() throws Exception { super.setUp(); - rand = new Random(); - desc = new HTableDescriptor("test"); - desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); - desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); - + // This size is picked so the table is split into two + // after addContent in testMultiRegionTableMapReduce. + conf.setLong("hbase.hregion.max.filesize", 256 * 1024); dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); try { fs = dfsCluster.getFileSystem(); dir = new Path("/hbase"); fs.mkdirs(dir); - - // create the root and meta regions and insert the data region into the meta - - HRegion root = createNewHRegion(dir, conf, HGlobals.rootTableDesc, 0L, null, null); - HRegion meta = createNewHRegion(dir, conf, HGlobals.metaTableDesc, 1L, null, null); - HRegion.addRegionToMETA(root, meta); - - HRegion region = createNewHRegion(dir, conf, desc, rand.nextLong(), null, null); - HRegion.addRegionToMETA(meta, region); - - // insert some data into the test table - - for(int i = 0; i < values.length; i++) { - long lockid = region.startUpdate(new Text("row_" - + String.format("%1$05d", i))); - - region.put(lockid, TEXT_INPUT_COLUMN, values[i]); - region.commit(lockid, System.currentTimeMillis()); - } - - region.close(); - region.getLog().closeAndDelete(); - meta.close(); - meta.getLog().closeAndDelete(); - root.close(); - root.getLog().closeAndDelete(); - // Start up HBase cluster - hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); - + LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS)); } catch (Exception e) { if (dfsCluster != null) { dfsCluster.shutdown(); + dfsCluster = null; } + throw e; } } @@ -130,11 +98,13 @@ public class TestTableMapReduce extends HBaseTestCase { @Override public void tearDown() throws Exception { super.tearDown(); - if(hCluster != null) { hCluster.shutdown(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } /** @@ -190,18 +160,50 @@ public class TestTableMapReduce extends HBaseTestCase { output.collect(tKey, outval); } } - + /** - * Test HBase map/reduce + * Test hbase mapreduce jobs against single region and multi-region tables. + */ + public void testTableMapReduce() throws IOException { + localTestSingleRegionTable(); + localTestMultiRegionTable(); + } + + /* + * Test against a single region. * @throws IOException */ - @SuppressWarnings("static-access") - public void testTableMapReduce() throws IOException { - System.out.println("Print table contents before map/reduce"); - scanTable(conf); + private void localTestSingleRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // insert some data into the test table + HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); + + for(int i = 0; i < values.length; i++) { + long lockid = table.startUpdate(new Text("row_" + + String.format("%1$05d", i))); + + try { + table.put(lockid, TEXT_INPUT_COLUMN, values[i]); + table.commit(lockid, System.currentTimeMillis()); + lockid = -1; + } finally { + if (lockid != -1) + table.abort(lockid); + } + } + + LOG.info("Print table contents before map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); @SuppressWarnings("deprecation") - MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1); + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); try { JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); @@ -209,10 +211,11 @@ public class TestTableMapReduce extends HBaseTestCase { jobConf.setNumMapTasks(1); jobConf.setNumReduceTasks(1); - ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN, + TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, ProcessContentsMapper.class, jobConf); - IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf); + IdentityTableReduce.initJob(SINGLE_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); JobClient.runJob(jobConf); @@ -220,12 +223,63 @@ public class TestTableMapReduce extends HBaseTestCase { mrCluster.shutdown(); } - System.out.println("Print table contents after map/reduce"); - scanTable(conf); + LOG.info("Print table contents after map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); + + // verify map-reduce results + verify(conf, SINGLE_REGION_TABLE_NAME); } - private void scanTable(Configuration conf) throws IOException { - HTable table = new HTable(conf, new Text(TABLE_NAME)); + /* + * Test against multiple regions. + * @throws IOException + */ + private void localTestMultiRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // Populate a table into multiple regions + MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, + MULTI_REGION_TABLE_NAME, INPUT_COLUMN); + + // Verify table indeed has multiple regions + HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME)); + Text[] startKeys = table.getStartKeys(); + assertTrue(startKeys.length > 1); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(2); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + IdentityTableReduce.initJob(MULTI_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); + + JobClient.runJob(jobConf); + + } finally { + mrCluster.shutdown(); + } + + // verify map-reduce results + verify(conf, MULTI_REGION_TABLE_NAME); + } + + private void scanTable(Configuration conf, String tableName) + throws IOException { + HTable table = new HTable(conf, new Text(tableName)); Text[] columns = { TEXT_INPUT_COLUMN, @@ -239,13 +293,51 @@ public class TestTableMapReduce extends HBaseTestCase { TreeMap results = new TreeMap(); while(scanner.next(key, results)) { - System.out.print("row: " + key.getRow()); + LOG.info("row: " + key.getRow()); for(Map.Entry e: results.entrySet()) { - System.out.print(" column: " + e.getKey() + " value: " + LOG.info(" column: " + e.getKey() + " value: " + new String(e.getValue())); } - System.out.println(); + } + + } finally { + scanner.close(); + } + } + + private void verify(Configuration conf, String tableName) throws IOException { + HTable table = new HTable(conf, new Text(tableName)); + + Text[] columns = { + TEXT_INPUT_COLUMN, + TEXT_OUTPUT_COLUMN + }; + HScannerInterface scanner = + table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + + for(Map.Entry e: results.entrySet()) { + if (count == 0) + firstValue = e.getValue(); + if (count == 1) + secondValue = e.getValue(); + count++; + } + + // verify second value is the reverse of the first + assertEquals(firstValue.length, secondValue.length); + for (int i=0; i