HBASE-18090 Improve TableSnapshotInputFormat to allow more multiple mappers per region

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
libisthanks 2017-11-09 10:53:22 +08:00 committed by Michael Stack
parent 24d82195cb
commit 41b4877950
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
12 changed files with 380 additions and 47 deletions

View File

@ -151,7 +151,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions; int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util, org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1,
expectedNumSplits, false); expectedNumSplits, false);
} else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) { } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
/* /*
@ -165,7 +165,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
int expectedNumSplits = numRegions; int expectedNumSplits = numRegions;
org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util, org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1,
expectedNumSplits, false); expectedNumSplits, false);
} else { } else {
throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +"."); throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");

View File

@ -55,6 +55,8 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
// region is immutable, set isolation level // region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
htd.setReadOnly(true);
// open region from the snapshot directory // open region from the snapshot directory
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -193,6 +194,43 @@ public class TableMapReduceUtil {
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
} }
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param jobConf The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* @param splitAlgo algorithm to split
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException When setting up the details fails.
* @see TableSnapshotInputFormat
*/
public static void initTableSnapshotMapJob(String snapshotName, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf jobConf,
boolean addDependencyJars, Path tmpRestoreDir,
RegionSplitter.SplitAlgorithm splitAlgo,
int numSplitsPerRegion)
throws IOException {
TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
numSplitsPerRegion);
initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
addDependencyJars, TableSnapshotInputFormat.class);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
}
/** /**
* Use this before submitting a TableReduce job. It will * Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf. * appropriately set up the JobConf.

View File

@ -27,11 +27,13 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
@ -165,4 +167,20 @@ public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWrita
throws IOException { throws IOException {
TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
} }
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param job the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo split algorithm to generate splits from region
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException if an error occurs
*/
public static void setInput(JobConf job, String snapshotName, Path restoreDir,
RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo, numSplitsPerRegion);
}
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
/** /**
* Utility for {@link TableMapper} and {@link TableReducer} * Utility for {@link TableMapper} and {@link TableReducer}
@ -368,6 +369,45 @@ public class TableMapReduceUtil {
resetCacheConfig(job.getConfiguration()); resetCacheConfig(job.getConfiguration());
} }
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* @param splitAlgo algorithm to split
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException When setting up the details fails.
* @see TableSnapshotInputFormat
*/
public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Path tmpRestoreDir,
RegionSplitter.SplitAlgorithm splitAlgo,
int numSplitsPerRegion)
throws IOException {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
numSplitsPerRegion);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
resetCacheConfig(job.getConfiguration());
}
/** /**
* Use this before submitting a Multi TableMap job. It will appropriately set * Use this before submitting a Multi TableMap job. It will appropriately set
* up the job. * up the job.

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
@ -66,8 +67,10 @@ import java.util.List;
* } * }
* </pre> * </pre>
* <p> * <p>
* Internally, this input format restores the snapshot into the given tmp directory. Similar to * Internally, this input format restores the snapshot into the given tmp directory. By default,
* {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you
* can run N mapper tasks per every region, in which case the region key range will be split to
* N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading
* from each RecordReader. An internal RegionScanner is used to execute the * from each RecordReader. An internal RegionScanner is used to execute the
* {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
* <p> * <p>
@ -204,4 +207,21 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
throws IOException { throws IOException {
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
} }
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param job the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo split algorithm to generate splits from region
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException if an error occurs
*/
public static void setInput(Job job, String snapshotName, Path restoreDir,
RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
splitAlgo, numSplitsPerRegion);
}
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -75,6 +76,17 @@ public class TableSnapshotInputFormatImpl {
"hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
/**
* For MapReduce jobs running multiple mappers per region, determines
* what split algorithm we should be using to find split points for scanners.
*/
public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";
/**
* For MapReduce jobs running multiple mappers per region, determines
* number of splits to generate per region.
*/
public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
/** /**
* Implementation class for InputSplit logic common between mapred and mapreduce. * Implementation class for InputSplit logic common between mapred and mapreduce.
*/ */
@ -263,7 +275,30 @@ public class TableSnapshotInputFormatImpl {
// the temp dir where the snapshot is restored // the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
return getSplits(scan, manifest, regionInfos, restoreDir, conf); RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);
int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);
return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
}
public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{
String splitAlgoClassName = conf.get(SPLIT_ALGO);
if (splitAlgoClassName == null)
return null;
try {
return ((Class<? extends RegionSplitter.SplitAlgorithm>)
Class.forName(splitAlgoClassName)).newInstance();
} catch (ClassNotFoundException e) {
throw new IOException("SplitAlgo class " + splitAlgoClassName +
" is not found", e);
} catch (InstantiationException e) {
throw new IOException("SplitAlgo class " + splitAlgoClassName +
" is not instantiable", e);
} catch (IllegalAccessException e) {
throw new IOException("SplitAlgo class " + splitAlgoClassName +
" is not instantiable", e);
}
} }
public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) { public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
@ -310,6 +345,12 @@ public class TableSnapshotInputFormatImpl {
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException { List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);
}
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir,
Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
// load table descriptor // load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor(); HTableDescriptor htd = manifest.getTableDescriptor();
@ -319,6 +360,25 @@ public class TableSnapshotInputFormatImpl {
for (HRegionInfo hri : regionManifests) { for (HRegionInfo hri : regionManifests) {
// load region descriptor // load region descriptor
if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List<String> hosts = getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
Scan boundedScan = new Scan(scan);
boundedScan.setStartRow(sp[i]);
boundedScan.setStopRow(sp[i + 1]);
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
}
}
} else {
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())) { hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for // compute HDFS locations from snapshot files (which will get the locations for
@ -331,6 +391,7 @@ public class TableSnapshotInputFormatImpl {
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
} }
} }
}
return splits; return splits;
@ -397,6 +458,35 @@ public class TableSnapshotInputFormatImpl {
*/ */
public static void setInput(Configuration conf, String snapshotName, Path restoreDir) public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
throws IOException { throws IOException {
setInput(conf, snapshotName, restoreDir, null, 1);
}
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param numSplitsPerRegion how many input splits to generate per one region
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @throws IOException if an error occurs
*/
public static void setInput(Configuration conf, String snapshotName, Path restoreDir,
RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion)
throws IOException {
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
if (numSplitsPerRegion < 1) {
throw new IllegalArgumentException("numSplits must be >= 1, " +
"illegal numSplits : " + numSplitsPerRegion);
}
if (splitAlgo == null && numSplitsPerRegion > 1) {
throw new IllegalArgumentException("Split algo can't be null when numSplits > 1");
}
if (splitAlgo != null) {
conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());
}
conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);
conf.set(SNAPSHOT_NAME_KEY, snapshotName); conf.set(SNAPSHOT_NAME_KEY, snapshotName);
Path rootDir = FSUtils.getRootDir(conf); Path rootDir = FSUtils.getRootDir(conf);

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
@ -177,6 +178,17 @@ public class RegionSplitter {
*/ */
byte[][] split(int numRegions); byte[][] split(int numRegions);
/**
* Some MapReduce jobs may want to run multiple mappers per region,
* this is intended for such usecase.
*
* @param start first row (inclusive)
* @param end last row (exclusive)
* @param numSplits number of splits to generate
* @param inclusive whether start and end are returned as split points
*/
byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
/** /**
* In HBase, the first row is represented by an empty byte array. This might * In HBase, the first row is represented by an empty byte array. This might
* cause problems with your split algorithm or row printing. All your APIs * cause problems with your split algorithm or row printing. All your APIs
@ -960,6 +972,39 @@ public class RegionSplitter {
return convertToBytes(splits); return convertToBytes(splits);
} }
@Override
public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
BigInteger s = convertToBigInteger(start);
BigInteger e = convertToBigInteger(end);
Preconditions.checkArgument(e.compareTo(s) > 0,
"last row (%s) is configured less than first row (%s)", rowToStr(end),
end);
// +1 to range because the last row is inclusive
BigInteger range = e.subtract(s).add(BigInteger.ONE);
Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
"split granularity (%s) is greater than the range (%s)", numSplits, range);
BigInteger[] splits = new BigInteger[numSplits - 1];
BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
for (int i = 1; i < numSplits; i++) {
// NOTE: this means the last region gets all the slop.
// This is not a big deal if we're assuming n << MAXHEX
splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger
.valueOf(i)));
}
if (inclusive) {
BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
inclusiveSplitPoints[0] = convertToBigInteger(start);
inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
return convertToBytes(inclusiveSplitPoints);
} else {
return convertToBytes(splits);
}
}
public byte[] firstRow() { public byte[] firstRow() {
return convertToByte(firstRowInt); return convertToByte(firstRowInt);
} }
@ -1104,6 +1149,31 @@ public class RegionSplitter {
return Arrays.copyOfRange(splits, 1, splits.length - 1); return Arrays.copyOfRange(splits, 1, splits.length - 1);
} }
public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
start = firstRowBytes;
}
if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
end = lastRowBytes;
}
Preconditions.checkArgument(
Bytes.compareTo(end, start) > 0,
"last row (%s) is configured less than first row (%s)",
Bytes.toStringBinary(end),
Bytes.toStringBinary(start));
byte[][] splits = Bytes.split(start, end, true,
numSplits - 1);
Preconditions.checkState(splits != null,
"Could not calculate input splits with given user input: " + this);
if (inclusive) {
return splits;
} else {
// remove endpoints, which are included in the splits list
return Arrays.copyOfRange(splits, 1, splits.length - 1);
}
}
@Override @Override
public byte[] firstRow() { public byte[] firstRow() {
return firstRowBytes; return firstRowBytes;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobClient;
@ -131,20 +132,20 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Test @Test
@Override @Override
public void testWithMockedMapReduceMultiRegion() throws Exception { public void testWithMockedMapReduceMultiRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10); testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10);
} }
@Test @Test
@Override @Override
public void testWithMapReduceMultiRegion() throws Exception { public void testWithMapReduceMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false); testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
} }
@Test @Test
@Override @Override
// run the MR job while HBase is offline // run the MR job while HBase is offline
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true); testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
} }
@Override @Override
@ -158,7 +159,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits) throws Exception { int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
setupCluster(); setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce"); TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try { try {
@ -168,9 +169,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
JobConf job = new JobConf(util.getConfiguration()); JobConf job = new JobConf(util.getConfiguration());
Path tmpTableDir = util.getRandomDir(); Path tmpTableDir = util.getRandomDir();
if (numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir); NullWritable.class, job, false, tmpTableDir);
}
// mapred doesn't support start and end keys? o.O // mapred doesn't support start and end keys? o.O
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
@ -219,16 +227,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception { boolean shutdownCluster) throws Exception {
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
numRegions, expectedNumSplits, shutdownCluster); numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
} }
// this is also called by the IntegrationTestTableSnapshotInputFormat // this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
int expectedNumSplits, boolean shutdownCluster) throws Exception { int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
//create the table and snapshot //create the table and snapshot
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
@ -245,9 +253,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
TestTableSnapshotInputFormat.class); TestTableSnapshotInputFormat.class);
if(numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, jobConf, true, tableDir); NullWritable.class, jobConf, true, tableDir);
}
jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
jobConf.setNumReduceTasks(1); jobConf.setNumReduceTasks(1);

View File

@ -79,10 +79,10 @@ public abstract class TableSnapshotInputFormatTestBase {
} }
protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits) throws Exception; int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception;
protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception; boolean shutdownCluster) throws Exception;
protected abstract byte[] getStartRow(); protected abstract byte[] getStartRow();
@ -91,28 +91,33 @@ public abstract class TableSnapshotInputFormatTestBase {
@Test @Test
public void testWithMockedMapReduceSingleRegion() throws Exception { public void testWithMockedMapReduceSingleRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1);
} }
@Test @Test
public void testWithMockedMapReduceMultiRegion() throws Exception { public void testWithMockedMapReduceMultiRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8);
} }
@Test @Test
public void testWithMapReduceSingleRegion() throws Exception { public void testWithMapReduceSingleRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, 1, false);
} }
@Test @Test
public void testWithMapReduceMultiRegion() throws Exception { public void testWithMapReduceMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 8, false);
}
@Test
public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
} }
@Test @Test
// run the MR job while HBase is offline // run the MR job while HBase is offline
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 8, true);
} }
// Test that snapshot restore does not create back references in the HBase root dir. // Test that snapshot restore does not create back references in the HBase root dir.
@ -160,13 +165,13 @@ public abstract class TableSnapshotInputFormatTestBase {
String snapshotName, Path tmpTableDir) throws Exception; String snapshotName, Path tmpTableDir) throws Exception;
protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName, protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
setupCluster(); setupCluster();
try { try {
Path tableDir = util.getRandomDir(); Path tableDir = util.getRandomDir();
TableName tableName = TableName.valueOf("testWithMapReduce"); TableName tableName = TableName.valueOf("testWithMapReduce");
testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions, testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions,
expectedNumSplits, shutdownCluster); numSplitsPerRegion, expectedNumSplits, shutdownCluster);
} finally { } finally {
tearDownCluster(); tearDownCluster();
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -202,7 +203,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits) throws Exception { int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
setupCluster(); setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce"); TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try { try {
@ -213,9 +214,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
Path tmpTableDir = util.getRandomDir(); Path tmpTableDir = util.getRandomDir();
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
if (numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir); NullWritable.class, job, false, tmpTableDir);
}
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
@ -328,16 +336,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception { boolean shutdownCluster) throws Exception {
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
numRegions, expectedNumSplits, shutdownCluster); numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
} }
// this is also called by the IntegrationTestTableSnapshotInputFormat // this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
int expectedNumSplits, boolean shutdownCluster) throws Exception { int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
LOG.info("testing with MapReduce"); LOG.info("testing with MapReduce");
@ -358,9 +366,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
TestTableSnapshotInputFormat.class); TestTableSnapshotInputFormat.class);
if (numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, true, tableDir); NullWritable.class, job, true, tableDir);
}
job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
job.setNumReduceTasks(1); job.setNumReduceTasks(1);

View File

@ -162,7 +162,17 @@ public class TestRegionSplitter {
// Halfway between df... and ff... should be ef.... // Halfway between df... and ff... should be ef....
splitPoint = splitter.split("dfffffff".getBytes(), lastRow); splitPoint = splitter.split("dfffffff".getBytes(), lastRow);
assertArrayEquals("efffffff".getBytes(), splitPoint); assertArrayEquals(splitPoint,"efffffff".getBytes());
// Check splitting region with multiple mappers per region
byte[][] splits = splitter.split("00000000".getBytes(), "30000000".getBytes(), 3, false);
assertEquals(2, splits.length);
assertArrayEquals(splits[0], "10000000".getBytes());
assertArrayEquals(splits[1], "20000000".getBytes());
splits = splitter.split("00000000".getBytes(), "20000000".getBytes(), 2, true);
assertEquals(3, splits.length);
assertArrayEquals(splits[1], "10000000".getBytes());
} }
/** /**
@ -253,6 +263,16 @@ public class TestRegionSplitter {
splitPoint = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'b'}); splitPoint = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'b'});
assertArrayEquals(splitPoint, new byte[] {'a', 'a', 'a', (byte)0x80 }); assertArrayEquals(splitPoint, new byte[] {'a', 'a', 'a', (byte)0x80 });
// Check splitting region with multiple mappers per region
byte[][] splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'd'}, 3, false);
assertEquals(2, splits.length);
assertArrayEquals(splits[0], new byte[]{'a', 'a', 'b'});
assertArrayEquals(splits[1], new byte[]{'a', 'a', 'c'});
splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'e'}, 2, true);
assertEquals(3, splits.length);
assertArrayEquals(splits[1], new byte[] { 'a', 'a', 'c'});
} }
@Test @Test