diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
index b98e21430c7..5c7fb459261 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
@@ -59,7 +60,7 @@ import org.junit.experimental.categories.Category;
*
"IntegrationTestTableSnapshotInputFormat.table" => the name of the table
*
"IntegrationTestTableSnapshotInputFormat.snapshot" => the name of the snapshot
*
"IntegrationTestTableSnapshotInputFormat.numRegions" => number of regions in the table
- * to be created
+ * to be created (default, 32).
*
"IntegrationTestTableSnapshotInputFormat.tableDir" => temporary directory to restore the
* snapshot files
*
@@ -78,10 +79,22 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
private static final String NUM_REGIONS_KEY =
"IntegrationTestTableSnapshotInputFormat.numRegions";
- private static final int DEFAULT_NUM_REGIONS = 32;
+ private static final String MR_IMPLEMENTATION_KEY =
+ "IntegrationTestTableSnapshotInputFormat.API";
+ private static final String MAPRED_IMPLEMENTATION = "mapred";
+ private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
+ private static final int DEFAULT_NUM_REGIONS = 32;
private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
+ private static final byte[] START_ROW = Bytes.toBytes("bbb");
+ private static final byte[] END_ROW = Bytes.toBytes("yyy");
+
+ // mapred API missing feature pairity with mapreduce. See comments in
+ // mapred.TestTableSnapshotInputFormat
+ private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
+ private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
+
private IntegrationTestingUtility util;
@Override
@@ -124,17 +137,39 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
tableDir = new Path(tableDirStr);
}
- /* We create the table using HBaseAdmin#createTable(), which will create the table
- * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
- * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
- * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
- * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
- * the input format, and we expect numRegions - 2 splits between bbb and yyy.
- */
- int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
+ final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
+ if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
+ /*
+ * We create the table using HBaseAdmin#createTable(), which will create the table
+ * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
+ * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
+ * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
+ * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
+ * the input format, and we expect numRegions - 2 splits between bbb and yyy.
+ */
+ LOG.debug("Running job with mapreduce API.");
+ int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
- TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir,
- numRegions, expectedNumSplits, false);
+ org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
+ tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions,
+ expectedNumSplits, false);
+ } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
+ /*
+ * Similar considerations to above. The difference is that mapred API does not support
+ * specifying start/end rows (or a scan object at all). Thus the omission of first and
+ * last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat
+ * for details of how that test works around the problem. This feature should be added
+ * in follow-on work.
+ */
+ LOG.debug("Running job with mapred API.");
+ int expectedNumSplits = numRegions;
+
+ org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
+ tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions,
+ expectedNumSplits, false);
+ } else {
+ throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
+ }
return 0;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index 2823415b1f8..7e7ba7672ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Put;
@@ -69,7 +70,16 @@ public class TableMapReduceUtil {
Class extends TableMap> mapper,
Class> outputKeyClass,
Class> outputValueClass, JobConf job) {
- initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
+ initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+ true, TableInputFormat.class);
+ }
+
+ public static void initTableMapJob(String table, String columns,
+ Class extends TableMap> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, JobConf job, boolean addDependencyJars) {
+ initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, TableInputFormat.class);
}
/**
@@ -88,9 +98,10 @@ public class TableMapReduceUtil {
public static void initTableMapJob(String table, String columns,
Class extends TableMap> mapper,
Class> outputKeyClass,
- Class> outputValueClass, JobConf job, boolean addDependencyJars) {
+ Class> outputValueClass, JobConf job, boolean addDependencyJars,
+ Class extends InputFormat> inputFormat) {
- job.setInputFormat(TableInputFormat.class);
+ job.setInputFormat(inputFormat);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
@@ -113,6 +124,37 @@ public class TableMapReduceUtil {
}
}
+ /**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param columns The columns to scan.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restore directory can be deleted.
+ * @throws IOException When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapJob(String snapshotName, String columns,
+ Class extends TableMap> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, JobConf job,
+ boolean addDependencyJars, Path tmpRestoreDir)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+ initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, TableSnapshotInputFormat.class);
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+ }
+
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
new file mode 100644
index 00000000000..356c5f7f502
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class TableSnapshotInputFormat implements InputFormat {
+
+ static class TableSnapshotRegionSplit implements InputSplit {
+ private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+ // constructor for mapreduce framework / Writable
+ public TableSnapshotRegionSplit() {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+ }
+
+ public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+ this.delegate = delegate;
+ }
+
+ public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
+ List locations) {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return delegate.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return delegate.getLocations();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ delegate.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ delegate.readFields(in);
+ }
+ }
+
+ static class TableSnapshotRecordReader
+ implements RecordReader {
+
+ private TableSnapshotInputFormatImpl.RecordReader delegate;
+
+ public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
+ throws IOException {
+ delegate = new TableSnapshotInputFormatImpl.RecordReader();
+ delegate.initialize(split.delegate, job);
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+ if (!delegate.nextKeyValue()) {
+ return false;
+ }
+ ImmutableBytesWritable currentKey = delegate.getCurrentKey();
+ key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
+ value.copyFrom(delegate.getCurrentValue());
+ return true;
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return delegate.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return delegate.getProgress();
+ }
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List splits =
+ TableSnapshotInputFormatImpl.getSplits(job);
+ InputSplit[] results = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new TableSnapshotRegionSplit(splits.get(i));
+ }
+ return results;
+ }
+
+ @Override
+ public RecordReader
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+ }
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param job the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(JobConf job, String snapshotName, Path restoreDir)
+ throws IOException {
+ TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 51aaf44b869..173b1b03f2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -271,6 +271,19 @@ public class TableMapReduceUtil {
outputValueClass, job, addDependencyJars, TableInputFormat.class);
}
+ /**
+ * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
+ * direct memory will likely cause the map tasks to OOM when opening the region. This
+ * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
+ * wants to override this behavior in their job.
+ */
+ public static void resetCacheConfig(Configuration conf) {
+ conf.setFloat(
+ HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ conf.setFloat("hbase.offheapcache.percentage", 0f);
+ conf.setFloat("hbase.bucketcache.size", 0f);
+ }
+
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
@@ -300,17 +313,7 @@ public class TableMapReduceUtil {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
-
- /*
- * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
- * direct memory will likely cause the map tasks to OOM when opening the region. This
- * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
- * wants to override this behavior in their job.
- */
- job.getConfiguration().setFloat(
- HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
- job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f);
- job.getConfiguration().setFloat("hbase.bucketcache.size", 0f);
+ resetCacheConfig(job.getConfiguration());
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index edf3077a1b5..f8d4d180599 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -18,43 +18,24 @@
package org.apache.hadoop.hbase.mapreduce;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
-import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -99,7 +80,7 @@ import com.google.common.annotations.VisibleForTesting;
* To read from snapshot files directly from the file system, the user who is running the MR job
* must have sufficient permissions to access snapshot and reference files.
* This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
- * user or the user must have group or other priviledges in the filesystem (See HBASE-8369).
+ * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
* Note that, given other users access to read from snapshot/data files will completely circumvent
* the access control enforced by HBase.
* @see TableSnapshotScanner
@@ -107,166 +88,94 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableSnapshotInputFormat extends InputFormat {
- // TODO: Snapshots files are owned in fs by the hbase user. There is no
- // easy way to delegate access.
-
- private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
-
- /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
- private static final String LOCALITY_CUTOFF_MULTIPLIER =
- "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
- private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
-
- private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
- private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
@VisibleForTesting
static class TableSnapshotRegionSplit extends InputSplit implements Writable {
- private HTableDescriptor htd;
- private HRegionInfo regionInfo;
- private String[] locations;
+ private TableSnapshotInputFormatImpl.InputSplit delegate;
// constructor for mapreduce framework / Writable
- public TableSnapshotRegionSplit() { }
-
- TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) {
- this.htd = htd;
- this.regionInfo = regionInfo;
- if (locations == null || locations.isEmpty()) {
- this.locations = new String[0];
- } else {
- this.locations = locations.toArray(new String[locations.size()]);
- }
+ public TableSnapshotRegionSplit() {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
}
+
+ public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+ this.delegate = delegate;
+ }
+
+ public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
+ List locations) {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ }
+
@Override
public long getLength() throws IOException, InterruptedException {
- //TODO: We can obtain the file sizes of the snapshot here.
- return 0;
+ return delegate.getLength();
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return locations;
+ return delegate.getLocations();
}
- // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
- // doing this wrapping with Writables.
@Override
public void write(DataOutput out) throws IOException {
- MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
- MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
- .setTable(htd.convert())
- .setRegion(HRegionInfo.convert(regionInfo));
-
- for (String location : locations) {
- builder.addLocations(location);
- }
-
- MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- split.writeTo(baos);
- baos.close();
- byte[] buf = baos.toByteArray();
- out.writeInt(buf.length);
- out.write(buf);
+ delegate.write(out);
}
+
@Override
public void readFields(DataInput in) throws IOException {
- int len = in.readInt();
- byte[] buf = new byte[len];
- in.readFully(buf);
- MapReduceProtos.TableSnapshotRegionSplit split =
- MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
- this.htd = HTableDescriptor.convert(split.getTable());
- this.regionInfo = HRegionInfo.convert(split.getRegion());
- List locationsList = split.getLocationsList();
- this.locations = locationsList.toArray(new String[locationsList.size()]);
+ delegate.readFields(in);
}
}
@VisibleForTesting
static class TableSnapshotRegionRecordReader extends
- RecordReader {
- private TableSnapshotRegionSplit split;
- private Scan scan;
- private Result result = null;
- private ImmutableBytesWritable row = null;
- private ClientSideRegionScanner scanner;
+ RecordReader {
+ private TableSnapshotInputFormatImpl.RecordReader delegate =
+ new TableSnapshotInputFormatImpl.RecordReader();
private TaskAttemptContext context;
private Method getCounter;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
-
- Configuration conf = context.getConfiguration();
- this.split = (TableSnapshotRegionSplit) split;
- HTableDescriptor htd = this.split.htd;
- HRegionInfo hri = this.split.regionInfo;
- FileSystem fs = FSUtils.getCurrentFileSystem(conf);
-
- Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
- // directory where snapshot was restored
-
- // create scan
- String scanStr = conf.get(TableInputFormat.SCAN);
- if (scanStr == null) {
- throw new IllegalArgumentException("A Scan is not configured for this job");
- }
- scan = TableMapReduceUtil.convertStringToScan(scanStr);
- // region is immutable, this should be fine,
- // otherwise we have to set the thread read point
- scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
- // disable caching of data blocks
- scan.setCacheBlocks(false);
-
- scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
this.context = context;
getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+ delegate.initialize(
+ ((TableSnapshotRegionSplit) split).delegate,
+ context.getConfiguration());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- result = scanner.next();
- if (result == null) {
- //we are done
- return false;
+ boolean result = delegate.nextKeyValue();
+ if (result) {
+ ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
+ if (scanMetrics != null && context != null) {
+ TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+ }
}
-
- if (this.row == null) {
- this.row = new ImmutableBytesWritable();
- }
- this.row.set(result.getRow());
-
- ScanMetrics scanMetrics = scanner.getScanMetrics();
- if (scanMetrics != null && context != null) {
- TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
- }
-
- return true;
- }
-
- @Override
- public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
- return row;
- }
-
- @Override
- public Result getCurrentValue() throws IOException, InterruptedException {
return result;
}
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return delegate.getCurrentKey();
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return delegate.getCurrentValue();
+ }
+
@Override
public float getProgress() throws IOException, InterruptedException {
- return 0; // TODO: use total bytes to estimate
+ return delegate.getProgress();
}
@Override
public void close() throws IOException {
- if (this.scanner != null) {
- this.scanner.close();
- }
+ delegate.close();
}
}
@@ -278,88 +187,12 @@ public class TableSnapshotInputFormat extends InputFormat getSplits(JobContext job) throws IOException, InterruptedException {
- Configuration conf = job.getConfiguration();
- String snapshotName = getSnapshotName(conf);
-
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
-
- Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
- SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
- List regionManifests = manifest.getRegionManifests();
- if (regionManifests == null) {
- throw new IllegalArgumentException("Snapshot seems empty");
+ List results = new ArrayList();
+ for (TableSnapshotInputFormatImpl.InputSplit split :
+ TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
+ results.add(new TableSnapshotRegionSplit(split));
}
-
- // load table descriptor
- HTableDescriptor htd = manifest.getTableDescriptor();
-
- Scan scan = TableMapReduceUtil.convertStringToScan(conf
- .get(TableInputFormat.SCAN));
- Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
-
- List splits = new ArrayList();
- for (SnapshotRegionManifest regionManifest : regionManifests) {
- // load region descriptor
- HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
-
- if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
- hri.getStartKey(), hri.getEndKey())) {
- // compute HDFS locations from snapshot files (which will get the locations for
- // referred hfiles)
- List hosts = getBestLocations(conf,
- HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
- int len = Math.min(3, hosts.size());
- hosts = hosts.subList(0, len);
- splits.add(new TableSnapshotRegionSplit(htd, hri, hosts));
- }
- }
-
- return splits;
- }
-
- /**
- * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
- * weights into account, thus will treat every location passed from the input split as equal. We
- * do not want to blindly pass all the locations, since we are creating one split per region, and
- * the region's blocks are all distributed throughout the cluster unless favorite node assignment
- * is used. On the expected stable case, only one location will contain most of the blocks as
- * local.
- * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
- * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
- * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
- * host with the best locality.
- */
- @VisibleForTesting
- List getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
- List locations = new ArrayList(3);
-
- HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
-
- if (hostAndWeights.length == 0) {
- return locations;
- }
-
- HostAndWeight topHost = hostAndWeights[0];
- locations.add(topHost.getHost());
-
- // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
- double cutoffMultiplier
- = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
-
- double filterWeight = topHost.getWeight() * cutoffMultiplier;
-
- for (int i = 1; i < hostAndWeights.length; i++) {
- if (hostAndWeights[i].getWeight() >= filterWeight) {
- locations.add(hostAndWeights[i].getHost());
- } else {
- break;
- }
- }
-
- return locations;
+ return results;
}
/**
@@ -371,26 +204,8 @@ public class TableSnapshotInputFormat extends InputFormat locations) {
+ this.htd = htd;
+ this.regionInfo = regionInfo;
+ if (locations == null || locations.isEmpty()) {
+ this.locations = new String[0];
+ } else {
+ this.locations = locations.toArray(new String[locations.size()]);
+ }
+ }
+
+ public long getLength() {
+ //TODO: We can obtain the file sizes of the snapshot here.
+ return 0;
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ public HTableDescriptor getTableDescriptor() {
+ return htd;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
+ // doing this wrapping with Writables.
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
+ .setTable(htd.convert())
+ .setRegion(HRegionInfo.convert(regionInfo));
+
+ for (String location : locations) {
+ builder.addLocations(location);
+ }
+
+ TableSnapshotRegionSplit split = builder.build();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ split.writeTo(baos);
+ baos.close();
+ byte[] buf = baos.toByteArray();
+ out.writeInt(buf.length);
+ out.write(buf);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ byte[] buf = new byte[len];
+ in.readFully(buf);
+ TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+ this.htd = HTableDescriptor.convert(split.getTable());
+ this.regionInfo = HRegionInfo.convert(split.getRegion());
+ List locationsList = split.getLocationsList();
+ this.locations = locationsList.toArray(new String[locationsList.size()]);
+ }
+ }
+
+ /**
+ * Implementation class for RecordReader logic common between mapred and mapreduce.
+ */
+ public static class RecordReader {
+ private InputSplit split;
+ private Scan scan;
+ private Result result = null;
+ private ImmutableBytesWritable row = null;
+ private ClientSideRegionScanner scanner;
+
+ public ClientSideRegionScanner getScanner() {
+ return scanner;
+ }
+
+ public void initialize(InputSplit split, Configuration conf) throws IOException {
+ this.split = split;
+ HTableDescriptor htd = split.htd;
+ HRegionInfo hri = this.split.getRegionInfo();
+ FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+
+ Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
+ // directory where snapshot was restored
+
+ // create scan
+ // TODO: mapred does not support scan as input API. Work around for now.
+ if (conf.get(TableInputFormat.SCAN) != null) {
+ scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+ } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
+ String[] columns =
+ conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
+ scan = new Scan();
+ for (String col : columns) {
+ scan.addFamily(Bytes.toBytes(col));
+ }
+ } else {
+ throw new IllegalArgumentException("A Scan is not configured for this job");
+ }
+
+ // region is immutable, this should be fine,
+ // otherwise we have to set the thread read point
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ // disable caching of data blocks
+ scan.setCacheBlocks(false);
+
+ scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+ }
+
+ public boolean nextKeyValue() throws IOException {
+ result = scanner.next();
+ if (result == null) {
+ //we are done
+ return false;
+ }
+
+ if (this.row == null) {
+ this.row = new ImmutableBytesWritable();
+ }
+ this.row.set(result.getRow());
+ return true;
+ }
+
+ public ImmutableBytesWritable getCurrentKey() {
+ return row;
+ }
+
+ public Result getCurrentValue() {
+ return result;
+ }
+
+ public long getPos() {
+ return 0;
+ }
+
+ public float getProgress() {
+ return 0; // TODO: use total bytes to estimate
+ }
+
+ public void close() {
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
+ }
+ }
+
+ public static List getSplits(Configuration conf) throws IOException {
+ String snapshotName = getSnapshotName(conf);
+
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+ List regionManifests = manifest.getRegionManifests();
+ if (regionManifests == null) {
+ throw new IllegalArgumentException("Snapshot seems empty");
+ }
+
+ // load table descriptor
+ HTableDescriptor htd = manifest.getTableDescriptor();
+
+ // TODO: mapred does not support scan as input API. Work around for now.
+ Scan scan = null;
+ if (conf.get(TableInputFormat.SCAN) != null) {
+ scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+ } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
+ String[] columns =
+ conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
+ scan = new Scan();
+ for (String col : columns) {
+ scan.addFamily(Bytes.toBytes(col));
+ }
+ } else {
+ throw new IllegalArgumentException("Unable to create scan");
+ }
+ Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+ List splits = new ArrayList();
+ for (SnapshotRegionManifest regionManifest : regionManifests) {
+ // load region descriptor
+ HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
+
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey())) {
+ // compute HDFS locations from snapshot files (which will get the locations for
+ // referred hfiles)
+ List hosts = getBestLocations(conf,
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+ int len = Math.min(3, hosts.size());
+ hosts = hosts.subList(0, len);
+ splits.add(new InputSplit(htd, hri, hosts));
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
+ * weights into account, thus will treat every location passed from the input split as equal. We
+ * do not want to blindly pass all the locations, since we are creating one split per region, and
+ * the region's blocks are all distributed throughout the cluster unless favorite node assignment
+ * is used. On the expected stable case, only one location will contain most of the blocks as
+ * local.
+ * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
+ * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
+ * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
+ * host with the best locality.
+ */
+ public static List getBestLocations(
+ Configuration conf, HDFSBlocksDistribution blockDistribution) {
+ List locations = new ArrayList(3);
+
+ HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
+
+ if (hostAndWeights.length == 0) {
+ return locations;
+ }
+
+ HostAndWeight topHost = hostAndWeights[0];
+ locations.add(topHost.getHost());
+
+ // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+ double cutoffMultiplier
+ = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+ double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+ for (int i = 1; i < hostAndWeights.length; i++) {
+ if (hostAndWeights[i].getWeight() >= filterWeight) {
+ locations.add(hostAndWeights[i].getHost());
+ } else {
+ break;
+ }
+ }
+
+ return locations;
+ }
+
+ private static String getSnapshotName(Configuration conf) {
+ String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+ if (snapshotName == null) {
+ throw new IllegalArgumentException("Snapshot name must be provided");
+ }
+ return snapshotName;
+ }
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param conf the job to configuration
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
+ throws IOException {
+ conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
+
+ // TODO: restore from record readers to parallelize.
+ RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+
+ conf.set(TABLE_DIR_KEY, restoreDir.toString());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
new file mode 100644
index 00000000000..fb13d703b49
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
+
+ private static final byte[] aaa = Bytes.toBytes("aaa");
+ private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
+ private static final String COLUMNS =
+ Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
+
+ @Override
+ protected byte[] getStartRow() {
+ return aaa;
+ }
+
+ @Override
+ protected byte[] getEndRow() {
+ return after_zzz;
+ }
+
+ static class TestTableSnapshotMapper extends MapReduceBase
+ implements TableMap {
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector collector, Reporter reporter)
+ throws IOException {
+ verifyRowFromMap(key, value);
+ collector.collect(key, NullWritable.get());
+ }
+ }
+
+ public static class TestTableSnapshotReducer extends MapReduceBase
+ implements Reducer {
+ HBaseTestingUtility.SeenRowTracker rowTracker =
+ new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator values,
+ OutputCollector collector, Reporter reporter)
+ throws IOException {
+ rowTracker.addRow(key.get());
+ }
+
+ @Override
+ public void close() {
+ rowTracker.validate();
+ }
+ }
+
+ @Test
+ public void testInitTableSnapshotMapperJobConfig() throws Exception {
+ setupCluster();
+ TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
+ String snapshotName = "foo";
+
+ try {
+ createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
+ JobConf job = new JobConf(UTIL.getConfiguration());
+ Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+ COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ // TODO: would be better to examine directly the cache instance that results from this
+ // config. Currently this is not possible because BlockCache initialization is static.
+ Assert.assertEquals(
+ "Snapshot job should be configured for default LruBlockCache.",
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
+ job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
+ Assert.assertEquals(
+ "Snapshot job should not use SlabCache.",
+ 0, job.getFloat("hbase.offheapcache.percentage", -1), 0.01);
+ Assert.assertEquals(
+ "Snapshot job should not use BucketCache.",
+ 0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
+ } finally {
+ UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ // TODO: mapred does not support limiting input range by startrow, endrow.
+ // Thus the following tests must override parameterverification.
+
+ @Test
+ @Override
+ public void testWithMockedMapReduceMultiRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
+ }
+
+ @Test
+ @Override
+ public void testWithMapReduceMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
+ }
+
+ @Test
+ @Override
+ // run the MR job while HBase is offline
+ public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
+ }
+
+ @Override
+ protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits) throws Exception {
+ setupCluster();
+ TableName tableName = TableName.valueOf("testWithMockedMapReduce");
+ try {
+ createTableAndSnapshot(
+ util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
+
+ JobConf job = new JobConf(util.getConfiguration());
+ Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+ COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ // mapred doesn't support start and end keys? o.O
+ verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
+
+ } finally {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
+ byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ InputSplit[] splits = tsif.getSplits(job, 0);
+
+ Assert.assertEquals(expectedNumSplits, splits.length);
+
+ HBaseTestingUtility.SeenRowTracker rowTracker =
+ new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+ for (int i = 0; i < splits.length; i++) {
+ // validate input split
+ InputSplit split = splits[i];
+ Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
+
+ // validate record reader
+ OutputCollector collector = mock(OutputCollector.class);
+ Reporter reporter = mock(Reporter.class);
+ RecordReader rr = tsif.getRecordReader(split, job, reporter);
+
+ // validate we can read all the data back
+ ImmutableBytesWritable key = rr.createKey();
+ Result value = rr.createValue();
+ while (rr.next(key, value)) {
+ verifyRowFromMap(key, value);
+ rowTracker.addRow(key.copyBytes());
+ }
+
+ rr.close();
+ }
+
+ // validate all rows are seen
+ rowTracker.validate();
+ }
+
+ @Override
+ protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ boolean shutdownCluster) throws Exception {
+ doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+ numRegions, expectedNumSplits, shutdownCluster);
+ }
+
+ // this is also called by the IntegrationTestTableSnapshotInputFormat
+ public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+ int expectedNumSplits, boolean shutdownCluster) throws Exception {
+
+ //create the table and snapshot
+ createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
+
+ if (shutdownCluster) {
+ util.shutdownMiniHBaseCluster();
+ }
+
+ try {
+ // create the job
+ JobConf jobConf = new JobConf(util.getConfiguration());
+
+ jobConf.setJarByClass(util.getClass());
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
+ TestTableSnapshotInputFormat.class);
+
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
+ TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, jobConf, true, tableDir);
+
+ jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+ jobConf.setNumReduceTasks(1);
+ jobConf.setOutputFormat(NullOutputFormat.class);
+
+ RunningJob job = JobClient.runJob(jobConf);
+ Assert.assertTrue(job.isSuccessful());
+ } finally {
+ if (!shutdownCluster) {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
new file mode 100644
index 00000000000..e82b3575504
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public abstract class TableSnapshotInputFormatTestBase {
+
+ protected final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ protected static final int NUM_REGION_SERVERS = 2;
+ protected static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+
+ protected FileSystem fs;
+ protected Path rootDir;
+
+ public void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(NUM_REGION_SERVERS);
+ rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ fs = rootDir.getFileSystem(UTIL.getConfiguration());
+ }
+
+ public void tearDownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void setupConf(Configuration conf) {
+ // Enable snapshot
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ }
+
+ protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits) throws Exception;
+
+ protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ boolean shutdownCluster) throws Exception;
+
+ protected abstract byte[] getStartRow();
+
+ protected abstract byte[] getEndRow();
+
+ @Test
+ public void testWithMockedMapReduceSingleRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+ }
+
+ @Test
+ public void testWithMockedMapReduceMultiRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+ }
+
+ @Test
+ public void testWithMapReduceSingleRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+ }
+
+ @Test
+ public void testWithMapReduceMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+ }
+
+ @Test
+ // run the MR job while HBase is offline
+ public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+ }
+
+ protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ setupCluster();
+ util.startMiniMapReduceCluster();
+ try {
+ Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
+ TableName tableName = TableName.valueOf("testWithMapReduce");
+ testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions,
+ expectedNumSplits, shutdownCluster);
+ } finally {
+ util.shutdownMiniMapReduceCluster();
+ tearDownCluster();
+ }
+ }
+
+ protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
+ throws IOException {
+ byte[] row = key.get();
+ CellScanner scanner = result.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+
+ //assert that all Cells in the Result have the same key
+ Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ }
+
+ for (int j = 0; j < FAMILIES.length; j++) {
+ byte[] actual = result.getValue(FAMILIES[j], null);
+ Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ + " ,actual:" + Bytes.toString(actual), row, actual);
+ }
+ }
+
+ protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, byte[] startRow, byte[] endRow, int numRegions)
+ throws Exception {
+ try {
+ util.deleteTable(tableName);
+ } catch(Exception ex) {
+ // ignore
+ }
+
+ if (numRegions > 1) {
+ util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions);
+ } else {
+ util.createTable(tableName, FAMILIES);
+ }
+ HBaseAdmin admin = util.getHBaseAdmin();
+
+ // put some stuff in the table
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ util.loadTable(table, FAMILIES);
+
+ Path rootDir = FSUtils.getRootDir(util.getConfiguration());
+ FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+ Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+ // load different values
+ byte[] value = Bytes.toBytes("after_snapshot_value");
+ util.loadTable(table, FAMILIES, value);
+
+ // cause flush to create new files in the region
+ admin.flush(tableName.toString());
+ table.close();
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 9f2d390581a..9e7102d24cc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -22,33 +22,20 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
-import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
-import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -64,34 +51,19 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(LargeTests.class)
-public class TestTableSnapshotInputFormat {
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
- private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
- private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static final int NUM_REGION_SERVERS = 2;
- private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
- private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
- private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
- public static byte[] bbb = Bytes.toBytes("bbb");
- public static byte[] yyy = Bytes.toBytes("yyy");
+ private static final byte[] bbb = Bytes.toBytes("bbb");
+ private static final byte[] yyy = Bytes.toBytes("yyy");
- private FileSystem fs;
- private Path rootDir;
-
- public void setupCluster() throws Exception {
- setupConf(UTIL.getConfiguration());
- UTIL.startMiniCluster(NUM_REGION_SERVERS);
- rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
- fs = rootDir.getFileSystem(UTIL.getConfiguration());
+ @Override
+ protected byte[] getStartRow() {
+ return bbb;
}
- public void tearDownCluster() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- private static void setupConf(Configuration conf) {
- // Enable snapshot
- conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ @Override
+ protected byte[] getEndRow() {
+ return yyy;
}
@After
@@ -100,7 +72,7 @@ public class TestTableSnapshotInputFormat {
@Test
public void testGetBestLocations() throws IOException {
- TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
Configuration conf = UTIL.getConfiguration();
HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
@@ -169,41 +141,6 @@ public class TestTableSnapshotInputFormat {
}
}
- public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
- String snapshotName, int numRegions)
- throws Exception {
- try {
- util.deleteTable(tableName);
- } catch(Exception ex) {
- // ignore
- }
-
- if (numRegions > 1) {
- util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
- } else {
- util.createTable(tableName, FAMILIES);
- }
- HBaseAdmin admin = util.getHBaseAdmin();
-
- // put some stuff in the table
- HTable table = new HTable(util.getConfiguration(), tableName);
- util.loadTable(table, FAMILIES);
-
- Path rootDir = FSUtils.getRootDir(util.getConfiguration());
- FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
-
- SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
- Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
-
- // load different values
- byte[] value = Bytes.toBytes("after_snapshot_value");
- util.loadTable(table, FAMILIES, value);
-
- // cause flush to create new files in the region
- admin.flush(tableName.toString());
- table.close();
- }
-
@Test
public void testInitTableSnapshotMapperJobConfig() throws Exception {
setupCluster();
@@ -211,7 +148,7 @@ public class TestTableSnapshotInputFormat {
String snapshotName = "foo";
try {
- createTableAndSnapshot(UTIL, tableName, snapshotName, 1);
+ createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
Job job = new Job(UTIL.getConfiguration());
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
@@ -238,32 +175,23 @@ public class TestTableSnapshotInputFormat {
}
}
- @Test
- public void testWithMockedMapReduceSingleRegion() throws Exception {
- testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
- }
-
- @Test
- public void testWithMockedMapReduceMultiRegion() throws Exception {
- testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
- }
-
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int expectedNumSplits) throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try {
- createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+ createTableAndSnapshot(
+ util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
Job job = new Job(util.getConfiguration());
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
- Scan scan = new Scan(bbb, yyy); // limit the scan
+ Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
- verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
+ verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
} finally {
util.getHBaseAdmin().deleteSnapshot(snapshotName);
@@ -309,63 +237,21 @@ public class TestTableSnapshotInputFormat {
rowTracker.validate();
}
- public static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
- throws IOException {
- byte[] row = key.get();
- CellScanner scanner = result.cellScanner();
- while (scanner.advance()) {
- Cell cell = scanner.current();
-
- //assert that all Cells in the Result have the same key
- Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
- cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
- }
-
- for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
- Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
- + " ,actual:" + Bytes.toString(actual), row, actual);
- }
- }
-
- @Test
- public void testWithMapReduceSingleRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
- }
-
- @Test
- public void testWithMapReduceMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
- }
-
- @Test
- // run the MR job while HBase is offline
- public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
- }
-
- private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
- setupCluster();
- util.startMiniMapReduceCluster();
- try {
- Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
- TableName tableName = TableName.valueOf("testWithMapReduce");
- doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
- expectedNumSplits, shutdownCluster);
- } finally {
- util.shutdownMiniMapReduceCluster();
- tearDownCluster();
- }
+ @Override
+ protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ boolean shutdownCluster) throws Exception {
+ doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+ numRegions, expectedNumSplits, shutdownCluster);
}
// this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
- String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
- boolean shutdownCluster) throws Exception {
+ String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+ int expectedNumSplits, boolean shutdownCluster) throws Exception {
//create the table and snapshot
- createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+ createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
if (shutdownCluster) {
util.shutdownMiniHBaseCluster();
@@ -374,7 +260,7 @@ public class TestTableSnapshotInputFormat {
try {
// create the job
Job job = new Job(util.getConfiguration());
- Scan scan = new Scan(bbb, yyy); // limit the scan
+ Scan scan = new Scan(startRow, endRow); // limit the scan
job.setJarByClass(util.getClass());
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),