HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java (localTestSingleRegionTable, localTestMultiRegionTable, verify): Added. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Javadoc for addContents and Loader interface and implementations. Methods have been made static so accessible w/o subclassing. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Guts of TestSplit has been moved here so other tests can have access to a multiregion table. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Bulk moved to MultiRegionTable utility class. Use this new class instead. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Added '@deprecated' javadoc. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Was throwing RuntimeException when a msgQueue.put was interrupted but this is a likely event on shutdown. Log a message instead. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Actually fix for HADOOP-1785... reverse test of row comparison. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@570983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5dc1ee32c0
commit
59825ac875
|
@ -7,23 +7,25 @@ Trunk (unreleased changes)
|
|||
|
||||
NEW FEATURES
|
||||
HADOOP-1768 FS command using Hadoop FsShell operations
|
||||
(Edward Yoon via Stack)
|
||||
(Edward Yoon via Stack)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
HADOOP-1527 Region server won't start because logdir exists
|
||||
HADOOP-1723 If master asks region server to shut down, by-pass return of
|
||||
shutdown message
|
||||
shutdown message
|
||||
HADOOP-1729 Recent renaming or META tables breaks hbase shell
|
||||
HADOOP-1730 unexpected null value causes META scanner to exit (silently)
|
||||
HADOOP-1747 On a cluster, on restart, regions multiply assigned
|
||||
HADOOP-1776 Fix for sporadic compaction failures closing and moving
|
||||
compaction result
|
||||
compaction result
|
||||
HADOOP-1780 Regions are still being doubly assigned
|
||||
HADOOP-1797 Fix NPEs in MetaScanner constructor
|
||||
HADOOP-1799 Incorrect classpath in binary version of Hadoop
|
||||
HADOOP-1805 Region server hang on exit
|
||||
HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug
|
||||
(Ning Li via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HADOOP-1737 Make HColumnDescriptor data publically members settable
|
||||
|
|
|
@ -3014,13 +3014,16 @@ HMasterRegionInterface, Runnable {
|
|||
// NOTE: If the server was serving the root region, we cannot reassign it
|
||||
// here because the new server will start serving the root region before
|
||||
// the PendingServerShutdown operation has a chance to split the log file.
|
||||
|
||||
try {
|
||||
if (info != null) {
|
||||
msgQueue.put(new PendingServerShutdown(info));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
||||
// continue. We used to throw a RuntimeException here but on exit
|
||||
// this put is often interrupted. For now, just log these iterrupts
|
||||
// rather than throw an exception
|
||||
LOG.warn("MsgQueue.put was interrupted (If we are exiting, this msg " +
|
||||
"can be ignored");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -445,7 +445,7 @@ public class HTable implements HConstants {
|
|||
* @param row name of row to be updated
|
||||
* @return lockid to be used in subsequent put, delete and commit calls
|
||||
*
|
||||
* Deprecated. Batch operations are now the default. startBatchUpdate is now
|
||||
* @deprecated Batch operations are now the default. startBatchUpdate is now
|
||||
* implemented by @see {@link #startUpdate(Text)}
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -457,7 +457,7 @@ public class HTable implements HConstants {
|
|||
* Abort a batch mutation
|
||||
* @param lockid lock id returned by startBatchUpdate
|
||||
*
|
||||
* Deprecated. Batch operations are now the default. abortBatch is now
|
||||
* @deprecated Batch operations are now the default. abortBatch is now
|
||||
* implemented by @see {@link #abort(long)}
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -471,7 +471,7 @@ public class HTable implements HConstants {
|
|||
* @param lockid lock id returned by startBatchUpdate
|
||||
* @throws IOException
|
||||
*
|
||||
* Deprecated. Batch operations are now the default. commitBatch(long) is now
|
||||
* @deprecated Batch operations are now the default. commitBatch(long) is now
|
||||
* implemented by @see {@link #commit(long)}
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -486,7 +486,7 @@ public class HTable implements HConstants {
|
|||
* @param timestamp time to associate with all the changes
|
||||
* @throws IOException
|
||||
*
|
||||
* Deprecated. Batch operations are now the default. commitBatch(long, long)
|
||||
* @deprecated Batch operations are now the default. commitBatch(long, long)
|
||||
* is now implemented by @see {@link #commit(long, long)}
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -622,7 +622,7 @@ public class HTable implements HConstants {
|
|||
*
|
||||
* @param lockid - lock id returned from startUpdate
|
||||
*
|
||||
* Deprecated. Batch updates are now the default. Consequently this method
|
||||
* @deprecated Batch updates are now the default. Consequently this method
|
||||
* does nothing.
|
||||
*/
|
||||
@Deprecated
|
||||
|
|
|
@ -144,7 +144,7 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
|
|||
|
||||
if(hasMore) {
|
||||
if(m_endRow.getLength() > 0 &&
|
||||
(tKey.getRow().compareTo(m_endRow) < 0)) {
|
||||
(tKey.getRow().compareTo(m_endRow) >= 0)) {
|
||||
|
||||
hasMore = false;
|
||||
|
||||
|
|
|
@ -103,7 +103,15 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
return htd;
|
||||
}
|
||||
|
||||
protected void addContent(final HRegion r, final String column)
|
||||
/**
|
||||
* Add content to region <code>r</code> on the passed column
|
||||
* <code>column</code>.
|
||||
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
|
||||
* @param r
|
||||
* @param column
|
||||
* @throws IOException
|
||||
*/
|
||||
protected static void addContent(final HRegion r, final String column)
|
||||
throws IOException {
|
||||
Text startKey = r.getRegionInfo().getStartKey();
|
||||
Text endKey = r.getRegionInfo().getEndKey();
|
||||
|
@ -113,14 +121,32 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
}
|
||||
addContent(new HRegionLoader(r), column, startKeyBytes, endKey);
|
||||
}
|
||||
|
||||
protected void addContent(final Loader updater, final String column)
|
||||
|
||||
/**
|
||||
* Add content to region <code>r</code> on the passed column
|
||||
* <code>column</code>.
|
||||
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
|
||||
* @param updater An instance of {@link Loader}.
|
||||
* @param column
|
||||
* @throws IOException
|
||||
*/
|
||||
protected static void addContent(final Loader updater, final String column)
|
||||
throws IOException {
|
||||
addContent(updater, column,
|
||||
new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null);
|
||||
}
|
||||
|
||||
protected void addContent(final Loader updater, final String column,
|
||||
|
||||
/**
|
||||
* Add content to region <code>r</code> on the passed column
|
||||
* <code>column</code>.
|
||||
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
|
||||
* @param updater An instance of {@link Loader}.
|
||||
* @param column
|
||||
* @param startKeyBytes Where to start the rows inserted
|
||||
* @param endKey Where to stop inserting rows.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected static void addContent(final Loader updater, final String column,
|
||||
final byte [] startKeyBytes, final Text endKey)
|
||||
throws IOException {
|
||||
// Add rows of three characters. The first character starts with the
|
||||
|
@ -156,14 +182,21 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public interface Loader {
|
||||
/**
|
||||
* Interface used by the addContent methods so either a HTable or a HRegion
|
||||
* can be passed to the methods.
|
||||
*/
|
||||
public static interface Loader {
|
||||
public long startBatchUpdate(final Text row) throws IOException;
|
||||
public void put(long lockid, Text column, byte val[]) throws IOException;
|
||||
public void commit(long lockid) throws IOException;
|
||||
public void abort(long lockid) throws IOException;
|
||||
}
|
||||
|
||||
public class HRegionLoader implements Loader {
|
||||
/**
|
||||
* A class that makes a {@link Loader} out of a {@link HRegion}
|
||||
*/
|
||||
public static class HRegionLoader implements Loader {
|
||||
final HRegion region;
|
||||
public HRegionLoader(final HRegion HRegion) {
|
||||
super();
|
||||
|
@ -182,8 +215,11 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
return this.region.startUpdate(row);
|
||||
}
|
||||
}
|
||||
|
||||
public class HTableLoader implements Loader {
|
||||
|
||||
/**
|
||||
* A class that makes a {@link Loader} out of a {@link HTable}
|
||||
*/
|
||||
public static class HTableLoader implements Loader {
|
||||
final HTable table;
|
||||
public HTableLoader(final HTable table) {
|
||||
super();
|
||||
|
@ -199,7 +235,7 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
this.table.put(lockid, column, val);
|
||||
}
|
||||
public long startBatchUpdate(Text row) {
|
||||
return this.table.startBatchUpdate(row);
|
||||
return this.table.startUpdate(row);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,257 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
* Utility class to build a table of multiple regions.
|
||||
*/
|
||||
public class MultiRegionTable extends HBaseTestCase {
|
||||
static final Log LOG = LogFactory.getLog(MultiRegionTable.class.getName());
|
||||
|
||||
/**
|
||||
* Make a multi-region table. Presumption is that table already exists.
|
||||
* Makes it multi-region by filling with data and provoking splits.
|
||||
* Asserts parent region is cleaned up after its daughter splits release all
|
||||
* references.
|
||||
* @param conf
|
||||
* @param cluster
|
||||
* @param localFs
|
||||
* @param tableName
|
||||
* @param columnName
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void makeMultiRegionTable(Configuration conf,
|
||||
MiniHBaseCluster cluster, FileSystem localFs, String tableName,
|
||||
String columnName)
|
||||
throws IOException {
|
||||
// This size should make it so we always split using the addContent
|
||||
// below. After adding all data, the first region is 1.3M. Should
|
||||
// set max filesize to be <= 1M.
|
||||
assertTrue(conf.getLong("hbase.hregion.max.filesize",
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024);
|
||||
|
||||
final int retries = 10;
|
||||
Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
|
||||
FileSystem fs = (cluster.getDFSCluster() == null) ?
|
||||
localFs : cluster.getDFSCluster().getFileSystem();
|
||||
assertTrue(fs != null);
|
||||
|
||||
// Get connection on the meta table and get count of rows.
|
||||
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
|
||||
HTable t = new HTable(conf, new Text(tableName));
|
||||
addContent(new HTableLoader(t), columnName);
|
||||
// All is running in the one JVM so I should be able to get the single
|
||||
// region instance and bring on a split.
|
||||
HRegionInfo hri =
|
||||
t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo();
|
||||
HRegion r = cluster.regionThreads.get(0).getRegionServer().
|
||||
onlineRegions.get(hri.getRegionName());
|
||||
// Flush will provoke a split next time the split-checker thread runs.
|
||||
r.flushcache(false);
|
||||
// Now, wait until split makes it into the meta table.
|
||||
for (int i = 0; i < retries &&
|
||||
(count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
int oldCount = count;
|
||||
count = count(meta, HConstants.COLUMN_FAMILY_STR);
|
||||
if (count <= oldCount) {
|
||||
throw new IOException("Failed waiting on splits to show up");
|
||||
}
|
||||
// Get info on the parent from the meta table. Pass in 'hri'. Its the
|
||||
// region we have been dealing with up to this. Its the parent of the
|
||||
// region split.
|
||||
Map<Text, byte []> data = getSplitParentInfo(meta, hri);
|
||||
HRegionInfo parent =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO));
|
||||
assertTrue(parent.isOffline());
|
||||
assertTrue(parent.isSplit());
|
||||
HRegionInfo splitA =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
|
||||
HRegionInfo splitB =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
|
||||
Path parentDir = HRegion.getRegionDir(d, parent.getRegionName());
|
||||
assertTrue(fs.exists(parentDir));
|
||||
LOG.info("Split happened. Parent is " + parent.getRegionName() +
|
||||
" and daughters are " + splitA.getRegionName() + ", " +
|
||||
splitB.getRegionName());
|
||||
// Recalibrate will cause us to wait on new regions' deployment
|
||||
recalibrate(t, new Text(columnName), retries);
|
||||
// Compact a region at a time so we can test case where one region has
|
||||
// no references but the other still has some
|
||||
compact(cluster, splitA);
|
||||
// Wait till the parent only has reference to remaining split, one that
|
||||
// still has references.
|
||||
while (getSplitParentInfo(meta, parent).size() == 3) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
LOG.info("Parent split returned " +
|
||||
getSplitParentInfo(meta, parent).keySet().toString());
|
||||
// Call second split.
|
||||
compact(cluster, splitB);
|
||||
// Now wait until parent disappears.
|
||||
LOG.info("Waiting on parent " + parent.getRegionName() +
|
||||
" to disappear");
|
||||
for (int i = 0; i < retries &&
|
||||
getSplitParentInfo(meta, parent) != null; i++) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
assertTrue(getSplitParentInfo(meta, parent) == null);
|
||||
// Assert cleaned up.
|
||||
for (int i = 0; i < retries && fs.exists(parentDir); i++) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
assertFalse(fs.exists(parentDir));
|
||||
}
|
||||
|
||||
/*
|
||||
* Count of rows in table for given column.
|
||||
* @param t
|
||||
* @param column
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static int count(final HTable t, final String column)
|
||||
throws IOException {
|
||||
int size = 0;
|
||||
Text [] cols = new Text[] {new Text(column)};
|
||||
HScannerInterface s = t.obtainScanner(cols, HConstants.EMPTY_START_ROW,
|
||||
System.currentTimeMillis(), null);
|
||||
try {
|
||||
HStoreKey curKey = new HStoreKey();
|
||||
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
|
||||
while(s.next(curKey, curVals)) {
|
||||
size++;
|
||||
}
|
||||
return size;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @return Return row info for passed in region or null if not found in scan.
|
||||
*/
|
||||
private static Map<Text, byte []> getSplitParentInfo(final HTable t,
|
||||
final HRegionInfo parent)
|
||||
throws IOException {
|
||||
HScannerInterface s = t.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY,
|
||||
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
try {
|
||||
HStoreKey curKey = new HStoreKey();
|
||||
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
|
||||
while(s.next(curKey, curVals)) {
|
||||
HRegionInfo hri = Writables.
|
||||
getHRegionInfoOrNull(curVals.get(HConstants.COL_REGIONINFO));
|
||||
if (hri == null) {
|
||||
continue;
|
||||
}
|
||||
if (hri.getRegionName().toString().
|
||||
equals(parent.getRegionName().toString())) {
|
||||
return curVals;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Recalibrate passed in HTable. Run after change in region geography.
|
||||
* Open a scanner on the table. This will force HTable to recalibrate
|
||||
* and in doing so, will force us to wait until the new child regions
|
||||
* come on-line (since they are no longer automatically served by the
|
||||
* HRegionServer that was serving the parent. In this test they will
|
||||
* end up on the same server (since there is only one), but we have to
|
||||
* wait until the master assigns them.
|
||||
* @param t
|
||||
* @param retries
|
||||
*/
|
||||
private static void recalibrate(final HTable t, final Text column,
|
||||
final int retries) throws IOException {
|
||||
for (int i = 0; i < retries; i++) {
|
||||
try {
|
||||
HScannerInterface s =
|
||||
t.obtainScanner(new Text[] {column}, HConstants.EMPTY_START_ROW);
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
s.next(key, results);
|
||||
break;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
} catch (NotServingRegionException x) {
|
||||
System.out.println("it's alright");
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Compact the passed in region <code>r</code>.
|
||||
* @param cluster
|
||||
* @param r
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void compact(final MiniHBaseCluster cluster,
|
||||
final HRegionInfo r)
|
||||
throws IOException {
|
||||
LOG.info("Starting compaction");
|
||||
for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) {
|
||||
SortedMap<Text, HRegion> regions =
|
||||
thread.getRegionServer().onlineRegions;
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
for (HRegion online: regions.values()) {
|
||||
if (online.getRegionName().toString().
|
||||
equals(r.getRegionName().toString())) {
|
||||
online.compactStores();
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (ConcurrentModificationException e) {
|
||||
LOG.warn("Retrying because ..." + e.toString() + " -- one or " +
|
||||
"two should be fine");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,16 +20,10 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -148,158 +142,24 @@ public class TestSplit extends HBaseTestCase {
|
|||
/**
|
||||
* Test that a region is cleaned up after its daughter splits release all
|
||||
* references.
|
||||
* @throws Exception
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testSplitRegionIsDeleted() throws Exception {
|
||||
final int retries = 10;
|
||||
public void testSplitRegionIsDeleted() throws IOException {
|
||||
// Start up a hbase cluster
|
||||
MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true);
|
||||
Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
|
||||
FileSystem fs = (cluster.getDFSCluster() == null)?
|
||||
this.localFs:
|
||||
cluster.getDFSCluster().getFileSystem();
|
||||
HTable meta = null;
|
||||
HTable t = null;
|
||||
try {
|
||||
// Create a table.
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(createTableDescriptor(getName()));
|
||||
// Get connection on the meta table and get count of rows.
|
||||
meta = new HTable(this.conf, HConstants.META_TABLE_NAME);
|
||||
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
|
||||
t = new HTable(this.conf, new Text(getName()));
|
||||
addContent(new HTableLoader(t), COLFAMILY_NAME3);
|
||||
// All is running in the one JVM so I should be able to get the single
|
||||
// region instance and bring on a split.
|
||||
HRegionInfo hri =
|
||||
t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo();
|
||||
HRegion r = cluster.regionThreads.get(0).getRegionServer().
|
||||
onlineRegions.get(hri.getRegionName());
|
||||
// Flush will provoke a split next time the split-checker thread runs.
|
||||
r.flushcache(false);
|
||||
// Now, wait until split makes it into the meta table.
|
||||
for (int i = 0; i < retries &&
|
||||
(count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
int oldCount = count;
|
||||
count = count(meta, HConstants.COLUMN_FAMILY_STR);
|
||||
if (count <= oldCount) {
|
||||
throw new IOException("Failed waiting on splits to show up");
|
||||
}
|
||||
// Get info on the parent from the meta table. Pass in 'hri'. Its the
|
||||
// region we have been dealing with up to this. Its the parent of the
|
||||
// region split.
|
||||
Map<Text, byte []> data = getSplitParentInfo(meta, hri);
|
||||
HRegionInfo parent =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_REGIONINFO));
|
||||
assertTrue(parent.isOffline());
|
||||
assertTrue(parent.isSplit());
|
||||
HRegionInfo splitA =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
|
||||
HRegionInfo splitB =
|
||||
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
|
||||
Path parentDir = HRegion.getRegionDir(d, parent.getRegionName());
|
||||
assertTrue(fs.exists(parentDir));
|
||||
LOG.info("Split happened. Parent is " + parent.getRegionName() +
|
||||
" and daughters are " + splitA.getRegionName() + ", " +
|
||||
splitB.getRegionName());
|
||||
// Recalibrate will cause us to wait on new regions' deployment
|
||||
recalibrate(t, new Text(COLFAMILY_NAME3), retries);
|
||||
// Compact a region at a time so we can test case where one region has
|
||||
// no references but the other still has some
|
||||
compact(cluster, splitA);
|
||||
// Wait till the parent only has reference to remaining split, one that
|
||||
// still has references.
|
||||
while (getSplitParentInfo(meta, parent).size() == 3) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
LOG.info("Parent split returned " +
|
||||
getSplitParentInfo(meta, parent).keySet().toString());
|
||||
// Call second split.
|
||||
compact(cluster, splitB);
|
||||
// Now wait until parent disappears.
|
||||
LOG.info("Waiting on parent " + parent.getRegionName() +
|
||||
" to disappear");
|
||||
for (int i = 0; i < retries &&
|
||||
getSplitParentInfo(meta, parent) != null; i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
assertTrue(getSplitParentInfo(meta, parent) == null);
|
||||
// Assert cleaned up.
|
||||
for (int i = 0; i < retries && fs.exists(parentDir); i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
assertFalse(fs.exists(parentDir));
|
||||
// This builds a multi-region table by splitting. It will assert
|
||||
// the parent region gets cleaned-up.
|
||||
MultiRegionTable.makeMultiRegionTable(conf, cluster,
|
||||
this.localFs, getName(), COLFAMILY_NAME3);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Compact the passed in region <code>r</code>.
|
||||
* @param cluster
|
||||
* @param r
|
||||
* @throws IOException
|
||||
*/
|
||||
private void compact(final MiniHBaseCluster cluster, final HRegionInfo r)
|
||||
throws IOException {
|
||||
LOG.info("Starting compaction");
|
||||
for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) {
|
||||
SortedMap<Text, HRegion> regions =
|
||||
thread.getRegionServer().onlineRegions;
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
for (HRegion online: regions.values()) {
|
||||
if (online.getRegionName().toString().
|
||||
equals(r.getRegionName().toString())) {
|
||||
online.compactStores();
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (ConcurrentModificationException e) {
|
||||
LOG.warn("Retrying because ..." + e.toString() + " -- one or " +
|
||||
"two should be fine");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Recalibrate passed in HTable. Run after change in region geography.
|
||||
* Open a scanner on the table. This will force HTable to recalibrate
|
||||
* and in doing so, will force us to wait until the new child regions
|
||||
* come on-line (since they are no longer automatically served by the
|
||||
* HRegionServer that was serving the parent. In this test they will
|
||||
* end up on the same server (since there is only one), but we have to
|
||||
* wait until the master assigns them.
|
||||
* @param t
|
||||
* @param retries
|
||||
*/
|
||||
private void recalibrate(final HTable t, final Text column,
|
||||
final int retries)
|
||||
throws IOException, InterruptedException {
|
||||
for (int i = 0; i < retries; i++) {
|
||||
try {
|
||||
HScannerInterface s =
|
||||
t.obtainScanner(new Text[] {column}, HConstants.EMPTY_START_ROW);
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
s.next(key, results);
|
||||
break;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
} catch (NotServingRegionException x) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertGet(final HRegion r, final String family, final Text k)
|
||||
throws IOException {
|
||||
// Now I have k, get values out and assert they are as expected.
|
||||
|
@ -312,59 +172,6 @@ public class TestSplit extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @return Return row info for passed in region or null if not found in scan.
|
||||
*/
|
||||
private Map<Text, byte []> getSplitParentInfo(final HTable t,
|
||||
final HRegionInfo parent)
|
||||
throws IOException {
|
||||
HScannerInterface s = t.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY,
|
||||
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
try {
|
||||
HStoreKey curKey = new HStoreKey();
|
||||
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
|
||||
while(s.next(curKey, curVals)) {
|
||||
HRegionInfo hri = Writables.
|
||||
getHRegionInfoOrNull(curVals.get(HConstants.COL_REGIONINFO));
|
||||
if (hri == null) {
|
||||
continue;
|
||||
}
|
||||
if (hri.getRegionName().toString().
|
||||
equals(parent.getRegionName().toString())) {
|
||||
return curVals;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Count of rows in table for given column.
|
||||
* @param t
|
||||
* @param column
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private int count(final HTable t, final String column)
|
||||
throws IOException {
|
||||
int size = 0;
|
||||
Text [] cols = new Text[] {new Text(column)};
|
||||
HScannerInterface s = t.obtainScanner(cols, HConstants.EMPTY_START_ROW,
|
||||
System.currentTimeMillis(), null);
|
||||
try {
|
||||
HStoreKey curKey = new HStoreKey();
|
||||
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
|
||||
while(s.next(curKey, curVals)) {
|
||||
size++;
|
||||
}
|
||||
return size;
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Assert first value in the passed region is <code>firstValue</code>.
|
||||
* @param r
|
||||
|
|
|
@ -21,26 +21,21 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.MapWritable;
|
||||
|
||||
import org.apache.hadoop.hbase.mapred.TableMap;
|
||||
import org.apache.hadoop.hbase.mapred.TableOutputCollector;
|
||||
import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
|
||||
|
@ -49,15 +44,16 @@ import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
|
|||
* Test Map/Reduce job over HBase tables
|
||||
*/
|
||||
public class TestTableMapReduce extends HBaseTestCase {
|
||||
static final String TABLE_NAME = "test";
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestTableMapReduce.class.getName());
|
||||
|
||||
static final String SINGLE_REGION_TABLE_NAME = "srtest";
|
||||
static final String MULTI_REGION_TABLE_NAME = "mrtest";
|
||||
static final String INPUT_COLUMN = "contents:";
|
||||
static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
|
||||
static final String OUTPUT_COLUMN = "text:";
|
||||
static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
|
||||
|
||||
private Random rand;
|
||||
private HTableDescriptor desc;
|
||||
|
||||
private MiniDFSCluster dfsCluster = null;
|
||||
private FileSystem fs;
|
||||
private Path dir;
|
||||
|
@ -76,51 +72,23 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
rand = new Random();
|
||||
desc = new HTableDescriptor("test");
|
||||
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
||||
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
||||
|
||||
// This size is picked so the table is split into two
|
||||
// after addContent in testMultiRegionTableMapReduce.
|
||||
conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
|
||||
dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
|
||||
try {
|
||||
fs = dfsCluster.getFileSystem();
|
||||
dir = new Path("/hbase");
|
||||
fs.mkdirs(dir);
|
||||
|
||||
// create the root and meta regions and insert the data region into the meta
|
||||
|
||||
HRegion root = createNewHRegion(dir, conf, HGlobals.rootTableDesc, 0L, null, null);
|
||||
HRegion meta = createNewHRegion(dir, conf, HGlobals.metaTableDesc, 1L, null, null);
|
||||
HRegion.addRegionToMETA(root, meta);
|
||||
|
||||
HRegion region = createNewHRegion(dir, conf, desc, rand.nextLong(), null, null);
|
||||
HRegion.addRegionToMETA(meta, region);
|
||||
|
||||
// insert some data into the test table
|
||||
|
||||
for(int i = 0; i < values.length; i++) {
|
||||
long lockid = region.startUpdate(new Text("row_"
|
||||
+ String.format("%1$05d", i)));
|
||||
|
||||
region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
|
||||
region.commit(lockid, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
region.close();
|
||||
region.getLog().closeAndDelete();
|
||||
meta.close();
|
||||
meta.getLog().closeAndDelete();
|
||||
root.close();
|
||||
root.getLog().closeAndDelete();
|
||||
|
||||
// Start up HBase cluster
|
||||
|
||||
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
|
||||
|
||||
LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS));
|
||||
} catch (Exception e) {
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
dfsCluster = null;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,11 +98,13 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
||||
if(hCluster != null) {
|
||||
hCluster.shutdown();
|
||||
}
|
||||
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -190,18 +160,50 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
output.collect(tKey, outval);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test HBase map/reduce
|
||||
* Test hbase mapreduce jobs against single region and multi-region tables.
|
||||
*/
|
||||
public void testTableMapReduce() throws IOException {
|
||||
localTestSingleRegionTable();
|
||||
localTestMultiRegionTable();
|
||||
}
|
||||
|
||||
/*
|
||||
* Test against a single region.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("static-access")
|
||||
public void testTableMapReduce() throws IOException {
|
||||
System.out.println("Print table contents before map/reduce");
|
||||
scanTable(conf);
|
||||
private void localTestSingleRegionTable() throws IOException {
|
||||
HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
||||
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
||||
|
||||
// Create a table.
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(desc);
|
||||
|
||||
// insert some data into the test table
|
||||
HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME));
|
||||
|
||||
for(int i = 0; i < values.length; i++) {
|
||||
long lockid = table.startUpdate(new Text("row_"
|
||||
+ String.format("%1$05d", i)));
|
||||
|
||||
try {
|
||||
table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
|
||||
table.commit(lockid, System.currentTimeMillis());
|
||||
lockid = -1;
|
||||
} finally {
|
||||
if (lockid != -1)
|
||||
table.abort(lockid);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Print table contents before map/reduce");
|
||||
scanTable(conf, SINGLE_REGION_TABLE_NAME);
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getName(), 1);
|
||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||
|
||||
try {
|
||||
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
|
||||
|
@ -209,10 +211,11 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
jobConf.setNumMapTasks(1);
|
||||
jobConf.setNumReduceTasks(1);
|
||||
|
||||
ProcessContentsMapper.initJob(TABLE_NAME, INPUT_COLUMN,
|
||||
TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN,
|
||||
ProcessContentsMapper.class, jobConf);
|
||||
|
||||
IdentityTableReduce.initJob(TABLE_NAME, IdentityTableReduce.class, jobConf);
|
||||
IdentityTableReduce.initJob(SINGLE_REGION_TABLE_NAME,
|
||||
IdentityTableReduce.class, jobConf);
|
||||
|
||||
JobClient.runJob(jobConf);
|
||||
|
||||
|
@ -220,12 +223,63 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
mrCluster.shutdown();
|
||||
}
|
||||
|
||||
System.out.println("Print table contents after map/reduce");
|
||||
scanTable(conf);
|
||||
LOG.info("Print table contents after map/reduce");
|
||||
scanTable(conf, SINGLE_REGION_TABLE_NAME);
|
||||
|
||||
// verify map-reduce results
|
||||
verify(conf, SINGLE_REGION_TABLE_NAME);
|
||||
}
|
||||
|
||||
private void scanTable(Configuration conf) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(TABLE_NAME));
|
||||
/*
|
||||
* Test against multiple regions.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void localTestMultiRegionTable() throws IOException {
|
||||
HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
|
||||
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
|
||||
|
||||
// Create a table.
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(desc);
|
||||
|
||||
// Populate a table into multiple regions
|
||||
MultiRegionTable.makeMultiRegionTable(conf, hCluster, null,
|
||||
MULTI_REGION_TABLE_NAME, INPUT_COLUMN);
|
||||
|
||||
// Verify table indeed has multiple regions
|
||||
HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME));
|
||||
Text[] startKeys = table.getStartKeys();
|
||||
assertTrue(startKeys.length > 1);
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
|
||||
|
||||
try {
|
||||
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
|
||||
jobConf.setJobName("process column contents");
|
||||
jobConf.setNumMapTasks(2);
|
||||
jobConf.setNumReduceTasks(1);
|
||||
|
||||
TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN,
|
||||
ProcessContentsMapper.class, jobConf);
|
||||
|
||||
IdentityTableReduce.initJob(MULTI_REGION_TABLE_NAME,
|
||||
IdentityTableReduce.class, jobConf);
|
||||
|
||||
JobClient.runJob(jobConf);
|
||||
|
||||
} finally {
|
||||
mrCluster.shutdown();
|
||||
}
|
||||
|
||||
// verify map-reduce results
|
||||
verify(conf, MULTI_REGION_TABLE_NAME);
|
||||
}
|
||||
|
||||
private void scanTable(Configuration conf, String tableName)
|
||||
throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
|
||||
Text[] columns = {
|
||||
TEXT_INPUT_COLUMN,
|
||||
|
@ -239,13 +293,51 @@ public class TestTableMapReduce extends HBaseTestCase {
|
|||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
|
||||
while(scanner.next(key, results)) {
|
||||
System.out.print("row: " + key.getRow());
|
||||
LOG.info("row: " + key.getRow());
|
||||
|
||||
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
|
||||
System.out.print(" column: " + e.getKey() + " value: "
|
||||
LOG.info(" column: " + e.getKey() + " value: "
|
||||
+ new String(e.getValue()));
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void verify(Configuration conf, String tableName) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
|
||||
Text[] columns = {
|
||||
TEXT_INPUT_COLUMN,
|
||||
TEXT_OUTPUT_COLUMN
|
||||
};
|
||||
HScannerInterface scanner =
|
||||
table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
|
||||
while(scanner.next(key, results)) {
|
||||
byte[] firstValue = null;
|
||||
byte[] secondValue = null;
|
||||
int count = 0;
|
||||
|
||||
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
|
||||
if (count == 0)
|
||||
firstValue = e.getValue();
|
||||
if (count == 1)
|
||||
secondValue = e.getValue();
|
||||
count++;
|
||||
}
|
||||
|
||||
// verify second value is the reverse of the first
|
||||
assertEquals(firstValue.length, secondValue.length);
|
||||
for (int i=0; i<firstValue.length; i++) {
|
||||
assertEquals(firstValue[i], secondValue[firstValue.length-i-1]);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue