HBASE-11137 Add mapred.TableSnapshotInputFormat
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1594982 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
712ce70e2e
commit
4c8510f852
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
|
|||
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||
import org.apache.hadoop.hbase.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -59,7 +60,7 @@ import org.junit.experimental.categories.Category;
|
|||
* <br>"IntegrationTestTableSnapshotInputFormat.table" => the name of the table
|
||||
* <br>"IntegrationTestTableSnapshotInputFormat.snapshot" => the name of the snapshot
|
||||
* <br>"IntegrationTestTableSnapshotInputFormat.numRegions" => number of regions in the table
|
||||
* to be created
|
||||
* to be created (default, 32).
|
||||
* <br>"IntegrationTestTableSnapshotInputFormat.tableDir" => temporary directory to restore the
|
||||
* snapshot files
|
||||
*
|
||||
|
@ -78,10 +79,22 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
|
|||
private static final String NUM_REGIONS_KEY =
|
||||
"IntegrationTestTableSnapshotInputFormat.numRegions";
|
||||
|
||||
private static final int DEFAULT_NUM_REGIONS = 32;
|
||||
private static final String MR_IMPLEMENTATION_KEY =
|
||||
"IntegrationTestTableSnapshotInputFormat.API";
|
||||
private static final String MAPRED_IMPLEMENTATION = "mapred";
|
||||
private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
|
||||
|
||||
private static final int DEFAULT_NUM_REGIONS = 32;
|
||||
private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
|
||||
|
||||
private static final byte[] START_ROW = Bytes.toBytes("bbb");
|
||||
private static final byte[] END_ROW = Bytes.toBytes("yyy");
|
||||
|
||||
// mapred API missing feature pairity with mapreduce. See comments in
|
||||
// mapred.TestTableSnapshotInputFormat
|
||||
private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
|
||||
private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
|
||||
|
||||
private IntegrationTestingUtility util;
|
||||
|
||||
@Override
|
||||
|
@ -124,17 +137,39 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
|
|||
tableDir = new Path(tableDirStr);
|
||||
}
|
||||
|
||||
/* We create the table using HBaseAdmin#createTable(), which will create the table
|
||||
* with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
|
||||
* desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
|
||||
* create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
|
||||
* bbb and endRow yyy, so, we expect the first and last region to be filtered out in
|
||||
* the input format, and we expect numRegions - 2 splits between bbb and yyy.
|
||||
*/
|
||||
int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
|
||||
final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
|
||||
if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
|
||||
/*
|
||||
* We create the table using HBaseAdmin#createTable(), which will create the table
|
||||
* with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
|
||||
* desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
|
||||
* create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
|
||||
* bbb and endRow yyy, so, we expect the first and last region to be filtered out in
|
||||
* the input format, and we expect numRegions - 2 splits between bbb and yyy.
|
||||
*/
|
||||
LOG.debug("Running job with mapreduce API.");
|
||||
int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
|
||||
|
||||
TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir,
|
||||
numRegions, expectedNumSplits, false);
|
||||
org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
|
||||
tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions,
|
||||
expectedNumSplits, false);
|
||||
} else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
|
||||
/*
|
||||
* Similar considerations to above. The difference is that mapred API does not support
|
||||
* specifying start/end rows (or a scan object at all). Thus the omission of first and
|
||||
* last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat
|
||||
* for details of how that test works around the problem. This feature should be added
|
||||
* in follow-on work.
|
||||
*/
|
||||
LOG.debug("Running job with mapred API.");
|
||||
int expectedNumSplits = numRegions;
|
||||
|
||||
org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
|
||||
tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions,
|
||||
expectedNumSplits, false);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -69,7 +70,16 @@ public class TableMapReduceUtil {
|
|||
Class<? extends TableMap> mapper,
|
||||
Class<?> outputKeyClass,
|
||||
Class<?> outputValueClass, JobConf job) {
|
||||
initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
|
||||
initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
|
||||
true, TableInputFormat.class);
|
||||
}
|
||||
|
||||
public static void initTableMapJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<?> outputKeyClass,
|
||||
Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
|
||||
initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
|
||||
addDependencyJars, TableInputFormat.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -88,9 +98,10 @@ public class TableMapReduceUtil {
|
|||
public static void initTableMapJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<?> outputKeyClass,
|
||||
Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
|
||||
Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
|
||||
Class<? extends InputFormat> inputFormat) {
|
||||
|
||||
job.setInputFormat(TableInputFormat.class);
|
||||
job.setInputFormat(inputFormat);
|
||||
job.setMapOutputValueClass(outputValueClass);
|
||||
job.setMapOutputKeyClass(outputKeyClass);
|
||||
job.setMapperClass(mapper);
|
||||
|
@ -113,6 +124,37 @@ public class TableMapReduceUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
||||
* and read directly from snapshot files.
|
||||
*
|
||||
* @param snapshotName The name of the snapshot (of a table) to read from.
|
||||
* @param columns The columns to scan.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job to adjust. Make sure the passed job is
|
||||
* carrying all necessary HBase configuration.
|
||||
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
||||
* job classes via the distributed cache (tmpjars).
|
||||
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* After the job is finished, restore directory can be deleted.
|
||||
* @throws IOException When setting up the details fails.
|
||||
* @see TableSnapshotInputFormat
|
||||
*/
|
||||
public static void initTableSnapshotMapJob(String snapshotName, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<?> outputKeyClass,
|
||||
Class<?> outputValueClass, JobConf job,
|
||||
boolean addDependencyJars, Path tmpRestoreDir)
|
||||
throws IOException {
|
||||
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
|
||||
initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
|
||||
addDependencyJars, TableSnapshotInputFormat.class);
|
||||
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableReduce job. It will
|
||||
* appropriately set up the JobConf.
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
|
||||
|
||||
static class TableSnapshotRegionSplit implements InputSplit {
|
||||
private TableSnapshotInputFormatImpl.InputSplit delegate;
|
||||
|
||||
// constructor for mapreduce framework / Writable
|
||||
public TableSnapshotRegionSplit() {
|
||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
|
||||
}
|
||||
|
||||
public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
||||
List<String> locations) {
|
||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() throws IOException {
|
||||
return delegate.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getLocations() throws IOException {
|
||||
return delegate.getLocations();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
delegate.write(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
delegate.readFields(in);
|
||||
}
|
||||
}
|
||||
|
||||
static class TableSnapshotRecordReader
|
||||
implements RecordReader<ImmutableBytesWritable, Result> {
|
||||
|
||||
private TableSnapshotInputFormatImpl.RecordReader delegate;
|
||||
|
||||
public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
|
||||
throws IOException {
|
||||
delegate = new TableSnapshotInputFormatImpl.RecordReader();
|
||||
delegate.initialize(split.delegate, job);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
|
||||
if (!delegate.nextKeyValue()) {
|
||||
return false;
|
||||
}
|
||||
ImmutableBytesWritable currentKey = delegate.getCurrentKey();
|
||||
key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
|
||||
value.copyFrom(delegate.getCurrentValue());
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableBytesWritable createKey() {
|
||||
return new ImmutableBytesWritable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result createValue() {
|
||||
return new Result();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPos() throws IOException {
|
||||
return delegate.getPos();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getProgress() throws IOException {
|
||||
return delegate.getProgress();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
|
||||
List<TableSnapshotInputFormatImpl.InputSplit> splits =
|
||||
TableSnapshotInputFormatImpl.getSplits(job);
|
||||
InputSplit[] results = new InputSplit[splits.size()];
|
||||
for (int i = 0; i < splits.size(); i++) {
|
||||
results[i] = new TableSnapshotRegionSplit(splits.get(i));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordReader<ImmutableBytesWritable, Result>
|
||||
getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
|
||||
return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
|
||||
* @param job the job to configure
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* After the job is finished, restoreDir can be deleted.
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public static void setInput(JobConf job, String snapshotName, Path restoreDir)
|
||||
throws IOException {
|
||||
TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
|
||||
}
|
||||
}
|
|
@ -271,6 +271,19 @@ public class TableMapReduceUtil {
|
|||
outputValueClass, job, addDependencyJars, TableInputFormat.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
|
||||
* direct memory will likely cause the map tasks to OOM when opening the region. This
|
||||
* is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
|
||||
* wants to override this behavior in their job.
|
||||
*/
|
||||
public static void resetCacheConfig(Configuration conf) {
|
||||
conf.setFloat(
|
||||
HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
||||
conf.setFloat("hbase.offheapcache.percentage", 0f);
|
||||
conf.setFloat("hbase.bucketcache.size", 0f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
||||
* and read directly from snapshot files.
|
||||
|
@ -300,17 +313,7 @@ public class TableMapReduceUtil {
|
|||
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
|
||||
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
|
||||
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
|
||||
|
||||
/*
|
||||
* Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
|
||||
* direct memory will likely cause the map tasks to OOM when opening the region. This
|
||||
* is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
|
||||
* wants to override this behavior in their job.
|
||||
*/
|
||||
job.getConfiguration().setFloat(
|
||||
HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
||||
job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f);
|
||||
job.getConfiguration().setFloat("hbase.bucketcache.size", 0f);
|
||||
resetCacheConfig(job.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,43 +18,24 @@
|
|||
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
|
||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
|
@ -99,7 +80,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* To read from snapshot files directly from the file system, the user who is running the MR job
|
||||
* must have sufficient permissions to access snapshot and reference files.
|
||||
* This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
|
||||
* user or the user must have group or other priviledges in the filesystem (See HBASE-8369).
|
||||
* user or the user must have group or other privileges in the filesystem (See HBASE-8369).
|
||||
* Note that, given other users access to read from snapshot/data files will completely circumvent
|
||||
* the access control enforced by HBase.
|
||||
* @see TableSnapshotScanner
|
||||
|
@ -107,166 +88,94 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
|
||||
// TODO: Snapshots files are owned in fs by the hbase user. There is no
|
||||
// easy way to delegate access.
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
|
||||
|
||||
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
|
||||
private static final String LOCALITY_CUTOFF_MULTIPLIER =
|
||||
"hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
|
||||
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
|
||||
|
||||
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
|
||||
private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
|
||||
|
||||
@VisibleForTesting
|
||||
static class TableSnapshotRegionSplit extends InputSplit implements Writable {
|
||||
private HTableDescriptor htd;
|
||||
private HRegionInfo regionInfo;
|
||||
private String[] locations;
|
||||
private TableSnapshotInputFormatImpl.InputSplit delegate;
|
||||
|
||||
// constructor for mapreduce framework / Writable
|
||||
public TableSnapshotRegionSplit() { }
|
||||
|
||||
TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
|
||||
this.htd = htd;
|
||||
this.regionInfo = regionInfo;
|
||||
if (locations == null || locations.isEmpty()) {
|
||||
this.locations = new String[0];
|
||||
} else {
|
||||
this.locations = locations.toArray(new String[locations.size()]);
|
||||
}
|
||||
public TableSnapshotRegionSplit() {
|
||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
|
||||
}
|
||||
|
||||
public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
||||
List<String> locations) {
|
||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() throws IOException, InterruptedException {
|
||||
//TODO: We can obtain the file sizes of the snapshot here.
|
||||
return 0;
|
||||
return delegate.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getLocations() throws IOException, InterruptedException {
|
||||
return locations;
|
||||
return delegate.getLocations();
|
||||
}
|
||||
|
||||
// TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
|
||||
// doing this wrapping with Writables.
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
|
||||
MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
|
||||
.setTable(htd.convert())
|
||||
.setRegion(HRegionInfo.convert(regionInfo));
|
||||
|
||||
for (String location : locations) {
|
||||
builder.addLocations(location);
|
||||
}
|
||||
|
||||
MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
split.writeTo(baos);
|
||||
baos.close();
|
||||
byte[] buf = baos.toByteArray();
|
||||
out.writeInt(buf.length);
|
||||
out.write(buf);
|
||||
delegate.write(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int len = in.readInt();
|
||||
byte[] buf = new byte[len];
|
||||
in.readFully(buf);
|
||||
MapReduceProtos.TableSnapshotRegionSplit split =
|
||||
MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
|
||||
this.htd = HTableDescriptor.convert(split.getTable());
|
||||
this.regionInfo = HRegionInfo.convert(split.getRegion());
|
||||
List<String> locationsList = split.getLocationsList();
|
||||
this.locations = locationsList.toArray(new String[locationsList.size()]);
|
||||
delegate.readFields(in);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class TableSnapshotRegionRecordReader extends
|
||||
RecordReader<ImmutableBytesWritable, Result> {
|
||||
private TableSnapshotRegionSplit split;
|
||||
private Scan scan;
|
||||
private Result result = null;
|
||||
private ImmutableBytesWritable row = null;
|
||||
private ClientSideRegionScanner scanner;
|
||||
RecordReader<ImmutableBytesWritable, Result> {
|
||||
private TableSnapshotInputFormatImpl.RecordReader delegate =
|
||||
new TableSnapshotInputFormatImpl.RecordReader();
|
||||
private TaskAttemptContext context;
|
||||
private Method getCounter;
|
||||
|
||||
@Override
|
||||
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
|
||||
InterruptedException {
|
||||
|
||||
Configuration conf = context.getConfiguration();
|
||||
this.split = (TableSnapshotRegionSplit) split;
|
||||
HTableDescriptor htd = this.split.htd;
|
||||
HRegionInfo hri = this.split.regionInfo;
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||
|
||||
Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
|
||||
// directory where snapshot was restored
|
||||
|
||||
// create scan
|
||||
String scanStr = conf.get(TableInputFormat.SCAN);
|
||||
if (scanStr == null) {
|
||||
throw new IllegalArgumentException("A Scan is not configured for this job");
|
||||
}
|
||||
scan = TableMapReduceUtil.convertStringToScan(scanStr);
|
||||
// region is immutable, this should be fine,
|
||||
// otherwise we have to set the thread read point
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
// disable caching of data blocks
|
||||
scan.setCacheBlocks(false);
|
||||
|
||||
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
|
||||
this.context = context;
|
||||
getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
|
||||
delegate.initialize(
|
||||
((TableSnapshotRegionSplit) split).delegate,
|
||||
context.getConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextKeyValue() throws IOException, InterruptedException {
|
||||
result = scanner.next();
|
||||
if (result == null) {
|
||||
//we are done
|
||||
return false;
|
||||
boolean result = delegate.nextKeyValue();
|
||||
if (result) {
|
||||
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
|
||||
if (scanMetrics != null && context != null) {
|
||||
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.row == null) {
|
||||
this.row = new ImmutableBytesWritable();
|
||||
}
|
||||
this.row.set(result.getRow());
|
||||
|
||||
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
||||
if (scanMetrics != null && context != null) {
|
||||
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
|
||||
return row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result getCurrentValue() throws IOException, InterruptedException {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
|
||||
return delegate.getCurrentKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result getCurrentValue() throws IOException, InterruptedException {
|
||||
return delegate.getCurrentValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getProgress() throws IOException, InterruptedException {
|
||||
return 0; // TODO: use total bytes to estimate
|
||||
return delegate.getProgress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.scanner != null) {
|
||||
this.scanner.close();
|
||||
}
|
||||
delegate.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,88 +187,12 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||
|
||||
@Override
|
||||
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
String snapshotName = getSnapshotName(conf);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = rootDir.getFileSystem(conf);
|
||||
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
|
||||
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
||||
if (regionManifests == null) {
|
||||
throw new IllegalArgumentException("Snapshot seems empty");
|
||||
List<InputSplit> results = new ArrayList<InputSplit>();
|
||||
for (TableSnapshotInputFormatImpl.InputSplit split :
|
||||
TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
|
||||
results.add(new TableSnapshotRegionSplit(split));
|
||||
}
|
||||
|
||||
// load table descriptor
|
||||
HTableDescriptor htd = manifest.getTableDescriptor();
|
||||
|
||||
Scan scan = TableMapReduceUtil.convertStringToScan(conf
|
||||
.get(TableInputFormat.SCAN));
|
||||
Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
|
||||
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
for (SnapshotRegionManifest regionManifest : regionManifests) {
|
||||
// load region descriptor
|
||||
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
|
||||
|
||||
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
|
||||
hri.getStartKey(), hri.getEndKey())) {
|
||||
// compute HDFS locations from snapshot files (which will get the locations for
|
||||
// referred hfiles)
|
||||
List<String> hosts = getBestLocations(conf,
|
||||
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
|
||||
|
||||
int len = Math.min(3, hosts.size());
|
||||
hosts = hosts.subList(0, len);
|
||||
splits.add(new TableSnapshotRegionSplit(htd, hri, hosts));
|
||||
}
|
||||
}
|
||||
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
|
||||
* weights into account, thus will treat every location passed from the input split as equal. We
|
||||
* do not want to blindly pass all the locations, since we are creating one split per region, and
|
||||
* the region's blocks are all distributed throughout the cluster unless favorite node assignment
|
||||
* is used. On the expected stable case, only one location will contain most of the blocks as
|
||||
* local.
|
||||
* On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
|
||||
* we are doing a simple heuristic, where we will pass all hosts which have at least 80%
|
||||
* (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
|
||||
* host with the best locality.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
|
||||
List<String> locations = new ArrayList<String>(3);
|
||||
|
||||
HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
|
||||
|
||||
if (hostAndWeights.length == 0) {
|
||||
return locations;
|
||||
}
|
||||
|
||||
HostAndWeight topHost = hostAndWeights[0];
|
||||
locations.add(topHost.getHost());
|
||||
|
||||
// Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
|
||||
double cutoffMultiplier
|
||||
= conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
|
||||
|
||||
double filterWeight = topHost.getWeight() * cutoffMultiplier;
|
||||
|
||||
for (int i = 1; i < hostAndWeights.length; i++) {
|
||||
if (hostAndWeights[i].getWeight() >= filterWeight) {
|
||||
locations.add(hostAndWeights[i].getHost());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return locations;
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -371,26 +204,8 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||
* After the job is finished, restoreDir can be deleted.
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = rootDir.getFileSystem(conf);
|
||||
|
||||
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
|
||||
|
||||
// TODO: restore from record readers to parallelize.
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
|
||||
conf.set(TABLE_DIR_KEY, restoreDir.toString());
|
||||
}
|
||||
|
||||
private static String getSnapshotName(Configuration conf) {
|
||||
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
|
||||
if (snapshotName == null) {
|
||||
throw new IllegalArgumentException("Snapshot name must be provided");
|
||||
}
|
||||
return snapshotName;
|
||||
public static void setInput(Job job, String snapshotName, Path restoreDir)
|
||||
throws IOException {
|
||||
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,356 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
|
||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class TableSnapshotInputFormatImpl {
|
||||
// TODO: Snapshots files are owned in fs by the hbase user. There is no
|
||||
// easy way to delegate access.
|
||||
|
||||
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
|
||||
private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
|
||||
|
||||
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
|
||||
private static final String LOCALITY_CUTOFF_MULTIPLIER =
|
||||
"hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
|
||||
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
|
||||
|
||||
/**
|
||||
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
||||
*/
|
||||
public static class InputSplit implements Writable {
|
||||
private HTableDescriptor htd;
|
||||
private HRegionInfo regionInfo;
|
||||
private String[] locations;
|
||||
|
||||
// constructor for mapreduce framework / Writable
|
||||
public InputSplit() {}
|
||||
|
||||
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
|
||||
this.htd = htd;
|
||||
this.regionInfo = regionInfo;
|
||||
if (locations == null || locations.isEmpty()) {
|
||||
this.locations = new String[0];
|
||||
} else {
|
||||
this.locations = locations.toArray(new String[locations.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
//TODO: We can obtain the file sizes of the snapshot here.
|
||||
return 0;
|
||||
}
|
||||
|
||||
public String[] getLocations() {
|
||||
return locations;
|
||||
}
|
||||
|
||||
public HTableDescriptor getTableDescriptor() {
|
||||
return htd;
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
return regionInfo;
|
||||
}
|
||||
|
||||
// TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
|
||||
// doing this wrapping with Writables.
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
|
||||
.setTable(htd.convert())
|
||||
.setRegion(HRegionInfo.convert(regionInfo));
|
||||
|
||||
for (String location : locations) {
|
||||
builder.addLocations(location);
|
||||
}
|
||||
|
||||
TableSnapshotRegionSplit split = builder.build();
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
split.writeTo(baos);
|
||||
baos.close();
|
||||
byte[] buf = baos.toByteArray();
|
||||
out.writeInt(buf.length);
|
||||
out.write(buf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int len = in.readInt();
|
||||
byte[] buf = new byte[len];
|
||||
in.readFully(buf);
|
||||
TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
|
||||
this.htd = HTableDescriptor.convert(split.getTable());
|
||||
this.regionInfo = HRegionInfo.convert(split.getRegion());
|
||||
List<String> locationsList = split.getLocationsList();
|
||||
this.locations = locationsList.toArray(new String[locationsList.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation class for RecordReader logic common between mapred and mapreduce.
|
||||
*/
|
||||
public static class RecordReader {
|
||||
private InputSplit split;
|
||||
private Scan scan;
|
||||
private Result result = null;
|
||||
private ImmutableBytesWritable row = null;
|
||||
private ClientSideRegionScanner scanner;
|
||||
|
||||
public ClientSideRegionScanner getScanner() {
|
||||
return scanner;
|
||||
}
|
||||
|
||||
public void initialize(InputSplit split, Configuration conf) throws IOException {
|
||||
this.split = split;
|
||||
HTableDescriptor htd = split.htd;
|
||||
HRegionInfo hri = this.split.getRegionInfo();
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||
|
||||
Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
|
||||
// directory where snapshot was restored
|
||||
|
||||
// create scan
|
||||
// TODO: mapred does not support scan as input API. Work around for now.
|
||||
if (conf.get(TableInputFormat.SCAN) != null) {
|
||||
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
|
||||
} else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
|
||||
String[] columns =
|
||||
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
|
||||
scan = new Scan();
|
||||
for (String col : columns) {
|
||||
scan.addFamily(Bytes.toBytes(col));
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("A Scan is not configured for this job");
|
||||
}
|
||||
|
||||
// region is immutable, this should be fine,
|
||||
// otherwise we have to set the thread read point
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
// disable caching of data blocks
|
||||
scan.setCacheBlocks(false);
|
||||
|
||||
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
|
||||
}
|
||||
|
||||
public boolean nextKeyValue() throws IOException {
|
||||
result = scanner.next();
|
||||
if (result == null) {
|
||||
//we are done
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this.row == null) {
|
||||
this.row = new ImmutableBytesWritable();
|
||||
}
|
||||
this.row.set(result.getRow());
|
||||
return true;
|
||||
}
|
||||
|
||||
public ImmutableBytesWritable getCurrentKey() {
|
||||
return row;
|
||||
}
|
||||
|
||||
public Result getCurrentValue() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public long getPos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public float getProgress() {
|
||||
return 0; // TODO: use total bytes to estimate
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (this.scanner != null) {
|
||||
this.scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static List<InputSplit> getSplits(Configuration conf) throws IOException {
|
||||
String snapshotName = getSnapshotName(conf);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = rootDir.getFileSystem(conf);
|
||||
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
|
||||
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
||||
if (regionManifests == null) {
|
||||
throw new IllegalArgumentException("Snapshot seems empty");
|
||||
}
|
||||
|
||||
// load table descriptor
|
||||
HTableDescriptor htd = manifest.getTableDescriptor();
|
||||
|
||||
// TODO: mapred does not support scan as input API. Work around for now.
|
||||
Scan scan = null;
|
||||
if (conf.get(TableInputFormat.SCAN) != null) {
|
||||
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
|
||||
} else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
|
||||
String[] columns =
|
||||
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
|
||||
scan = new Scan();
|
||||
for (String col : columns) {
|
||||
scan.addFamily(Bytes.toBytes(col));
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unable to create scan");
|
||||
}
|
||||
Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
|
||||
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
for (SnapshotRegionManifest regionManifest : regionManifests) {
|
||||
// load region descriptor
|
||||
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
|
||||
|
||||
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
|
||||
hri.getStartKey(), hri.getEndKey())) {
|
||||
// compute HDFS locations from snapshot files (which will get the locations for
|
||||
// referred hfiles)
|
||||
List<String> hosts = getBestLocations(conf,
|
||||
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
|
||||
|
||||
int len = Math.min(3, hosts.size());
|
||||
hosts = hosts.subList(0, len);
|
||||
splits.add(new InputSplit(htd, hri, hosts));
|
||||
}
|
||||
}
|
||||
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
|
||||
* weights into account, thus will treat every location passed from the input split as equal. We
|
||||
* do not want to blindly pass all the locations, since we are creating one split per region, and
|
||||
* the region's blocks are all distributed throughout the cluster unless favorite node assignment
|
||||
* is used. On the expected stable case, only one location will contain most of the blocks as
|
||||
* local.
|
||||
* On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
|
||||
* we are doing a simple heuristic, where we will pass all hosts which have at least 80%
|
||||
* (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
|
||||
* host with the best locality.
|
||||
*/
|
||||
public static List<String> getBestLocations(
|
||||
Configuration conf, HDFSBlocksDistribution blockDistribution) {
|
||||
List<String> locations = new ArrayList<String>(3);
|
||||
|
||||
HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
|
||||
|
||||
if (hostAndWeights.length == 0) {
|
||||
return locations;
|
||||
}
|
||||
|
||||
HostAndWeight topHost = hostAndWeights[0];
|
||||
locations.add(topHost.getHost());
|
||||
|
||||
// Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
|
||||
double cutoffMultiplier
|
||||
= conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
|
||||
|
||||
double filterWeight = topHost.getWeight() * cutoffMultiplier;
|
||||
|
||||
for (int i = 1; i < hostAndWeights.length; i++) {
|
||||
if (hostAndWeights[i].getWeight() >= filterWeight) {
|
||||
locations.add(hostAndWeights[i].getHost());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
private static String getSnapshotName(Configuration conf) {
|
||||
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
|
||||
if (snapshotName == null) {
|
||||
throw new IllegalArgumentException("Snapshot name must be provided");
|
||||
}
|
||||
return snapshotName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
|
||||
* @param conf the job to configuration
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* After the job is finished, restoreDir can be deleted.
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
|
||||
throws IOException {
|
||||
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = rootDir.getFileSystem(conf);
|
||||
|
||||
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
|
||||
|
||||
// TODO: restore from record readers to parallelize.
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
|
||||
conf.set(TABLE_DIR_KEY, restoreDir.toString());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,259 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.RunningJob;
|
||||
import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
|
||||
|
||||
private static final byte[] aaa = Bytes.toBytes("aaa");
|
||||
private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
|
||||
private static final String COLUMNS =
|
||||
Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
|
||||
|
||||
@Override
|
||||
protected byte[] getStartRow() {
|
||||
return aaa;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] getEndRow() {
|
||||
return after_zzz;
|
||||
}
|
||||
|
||||
static class TestTableSnapshotMapper extends MapReduceBase
|
||||
implements TableMap<ImmutableBytesWritable, NullWritable> {
|
||||
@Override
|
||||
public void map(ImmutableBytesWritable key, Result value,
|
||||
OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
|
||||
throws IOException {
|
||||
verifyRowFromMap(key, value);
|
||||
collector.collect(key, NullWritable.get());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTableSnapshotReducer extends MapReduceBase
|
||||
implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
|
||||
HBaseTestingUtility.SeenRowTracker rowTracker =
|
||||
new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
|
||||
|
||||
@Override
|
||||
public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
|
||||
OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
|
||||
throws IOException {
|
||||
rowTracker.addRow(key.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
rowTracker.validate();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitTableSnapshotMapperJobConfig() throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
|
||||
String snapshotName = "foo";
|
||||
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
|
||||
JobConf job = new JobConf(UTIL.getConfiguration());
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
|
||||
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, job, false, tmpTableDir);
|
||||
|
||||
// TODO: would be better to examine directly the cache instance that results from this
|
||||
// config. Currently this is not possible because BlockCache initialization is static.
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should be configured for default LruBlockCache.",
|
||||
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
|
||||
job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should not use SlabCache.",
|
||||
0, job.getFloat("hbase.offheapcache.percentage", -1), 0.01);
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should not use BucketCache.",
|
||||
0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
|
||||
} finally {
|
||||
UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: mapred does not support limiting input range by startrow, endrow.
|
||||
// Thus the following tests must override parameterverification.
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testWithMockedMapReduceMultiRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testWithMapReduceMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
// run the MR job while HBase is offline
|
||||
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
|
||||
int numRegions, int expectedNumSplits) throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
|
||||
try {
|
||||
createTableAndSnapshot(
|
||||
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
|
||||
|
||||
JobConf job = new JobConf(util.getConfiguration());
|
||||
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
|
||||
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, job, false, tmpTableDir);
|
||||
|
||||
// mapred doesn't support start and end keys? o.O
|
||||
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
|
||||
|
||||
} finally {
|
||||
util.getHBaseAdmin().deleteSnapshot(snapshotName);
|
||||
util.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
|
||||
byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
|
||||
TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
|
||||
InputSplit[] splits = tsif.getSplits(job, 0);
|
||||
|
||||
Assert.assertEquals(expectedNumSplits, splits.length);
|
||||
|
||||
HBaseTestingUtility.SeenRowTracker rowTracker =
|
||||
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
|
||||
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
// validate input split
|
||||
InputSplit split = splits[i];
|
||||
Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
|
||||
|
||||
// validate record reader
|
||||
OutputCollector collector = mock(OutputCollector.class);
|
||||
Reporter reporter = mock(Reporter.class);
|
||||
RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
|
||||
|
||||
// validate we can read all the data back
|
||||
ImmutableBytesWritable key = rr.createKey();
|
||||
Result value = rr.createValue();
|
||||
while (rr.next(key, value)) {
|
||||
verifyRowFromMap(key, value);
|
||||
rowTracker.addRow(key.copyBytes());
|
||||
}
|
||||
|
||||
rr.close();
|
||||
}
|
||||
|
||||
// validate all rows are seen
|
||||
rowTracker.validate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
|
||||
boolean shutdownCluster) throws Exception {
|
||||
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
|
||||
numRegions, expectedNumSplits, shutdownCluster);
|
||||
}
|
||||
|
||||
// this is also called by the IntegrationTestTableSnapshotInputFormat
|
||||
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
|
||||
int expectedNumSplits, boolean shutdownCluster) throws Exception {
|
||||
|
||||
//create the table and snapshot
|
||||
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
|
||||
|
||||
if (shutdownCluster) {
|
||||
util.shutdownMiniHBaseCluster();
|
||||
}
|
||||
|
||||
try {
|
||||
// create the job
|
||||
JobConf jobConf = new JobConf(util.getConfiguration());
|
||||
|
||||
jobConf.setJarByClass(util.getClass());
|
||||
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
|
||||
TestTableSnapshotInputFormat.class);
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
|
||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, jobConf, true, tableDir);
|
||||
|
||||
jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
|
||||
jobConf.setNumReduceTasks(1);
|
||||
jobConf.setOutputFormat(NullOutputFormat.class);
|
||||
|
||||
RunningJob job = JobClient.runJob(jobConf);
|
||||
Assert.assertTrue(job.isSuccessful());
|
||||
} finally {
|
||||
if (!shutdownCluster) {
|
||||
util.getHBaseAdmin().deleteSnapshot(snapshotName);
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
public abstract class TableSnapshotInputFormatTestBase {
|
||||
|
||||
protected final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
protected static final int NUM_REGION_SERVERS = 2;
|
||||
protected static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
|
||||
|
||||
protected FileSystem fs;
|
||||
protected Path rootDir;
|
||||
|
||||
public void setupCluster() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
UTIL.startMiniCluster(NUM_REGION_SERVERS);
|
||||
rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||
fs = rootDir.getFileSystem(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
public void tearDownCluster() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
// Enable snapshot
|
||||
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
}
|
||||
|
||||
protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
|
||||
int numRegions, int expectedNumSplits) throws Exception;
|
||||
|
||||
protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
|
||||
boolean shutdownCluster) throws Exception;
|
||||
|
||||
protected abstract byte[] getStartRow();
|
||||
|
||||
protected abstract byte[] getEndRow();
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceSingleRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceMultiRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMapReduceSingleRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMapReduceMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
// run the MR job while HBase is offline
|
||||
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
|
||||
}
|
||||
|
||||
protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
|
||||
int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
|
||||
setupCluster();
|
||||
util.startMiniMapReduceCluster();
|
||||
try {
|
||||
Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
|
||||
TableName tableName = TableName.valueOf("testWithMapReduce");
|
||||
testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions,
|
||||
expectedNumSplits, shutdownCluster);
|
||||
} finally {
|
||||
util.shutdownMiniMapReduceCluster();
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
|
||||
throws IOException {
|
||||
byte[] row = key.get();
|
||||
CellScanner scanner = result.cellScanner();
|
||||
while (scanner.advance()) {
|
||||
Cell cell = scanner.current();
|
||||
|
||||
//assert that all Cells in the Result have the same key
|
||||
Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
|
||||
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
}
|
||||
|
||||
for (int j = 0; j < FAMILIES.length; j++) {
|
||||
byte[] actual = result.getValue(FAMILIES[j], null);
|
||||
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
|
||||
+ " ,actual:" + Bytes.toString(actual), row, actual);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, byte[] startRow, byte[] endRow, int numRegions)
|
||||
throws Exception {
|
||||
try {
|
||||
util.deleteTable(tableName);
|
||||
} catch(Exception ex) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
if (numRegions > 1) {
|
||||
util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions);
|
||||
} else {
|
||||
util.createTable(tableName, FAMILIES);
|
||||
}
|
||||
HBaseAdmin admin = util.getHBaseAdmin();
|
||||
|
||||
// put some stuff in the table
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
util.loadTable(table, FAMILIES);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(util.getConfiguration());
|
||||
FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
|
||||
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
|
||||
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
|
||||
|
||||
// load different values
|
||||
byte[] value = Bytes.toBytes("after_snapshot_value");
|
||||
util.loadTable(table, FAMILIES, value);
|
||||
|
||||
// cause flush to create new files in the region
|
||||
admin.flush(tableName.toString());
|
||||
table.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -22,33 +22,20 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
|
@ -64,34 +51,19 @@ import org.junit.experimental.categories.Category;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestTableSnapshotInputFormat {
|
||||
public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
|
||||
private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final int NUM_REGION_SERVERS = 2;
|
||||
private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
|
||||
private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
|
||||
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
|
||||
public static byte[] bbb = Bytes.toBytes("bbb");
|
||||
public static byte[] yyy = Bytes.toBytes("yyy");
|
||||
private static final byte[] bbb = Bytes.toBytes("bbb");
|
||||
private static final byte[] yyy = Bytes.toBytes("yyy");
|
||||
|
||||
private FileSystem fs;
|
||||
private Path rootDir;
|
||||
|
||||
public void setupCluster() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
UTIL.startMiniCluster(NUM_REGION_SERVERS);
|
||||
rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||
fs = rootDir.getFileSystem(UTIL.getConfiguration());
|
||||
@Override
|
||||
protected byte[] getStartRow() {
|
||||
return bbb;
|
||||
}
|
||||
|
||||
public void tearDownCluster() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
// Enable snapshot
|
||||
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
@Override
|
||||
protected byte[] getEndRow() {
|
||||
return yyy;
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -100,7 +72,7 @@ public class TestTableSnapshotInputFormat {
|
|||
|
||||
@Test
|
||||
public void testGetBestLocations() throws IOException {
|
||||
TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
|
||||
TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
|
||||
HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
|
||||
|
@ -169,41 +141,6 @@ public class TestTableSnapshotInputFormat {
|
|||
}
|
||||
}
|
||||
|
||||
public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, int numRegions)
|
||||
throws Exception {
|
||||
try {
|
||||
util.deleteTable(tableName);
|
||||
} catch(Exception ex) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
if (numRegions > 1) {
|
||||
util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
|
||||
} else {
|
||||
util.createTable(tableName, FAMILIES);
|
||||
}
|
||||
HBaseAdmin admin = util.getHBaseAdmin();
|
||||
|
||||
// put some stuff in the table
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
util.loadTable(table, FAMILIES);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(util.getConfiguration());
|
||||
FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
|
||||
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
|
||||
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
|
||||
|
||||
// load different values
|
||||
byte[] value = Bytes.toBytes("after_snapshot_value");
|
||||
util.loadTable(table, FAMILIES, value);
|
||||
|
||||
// cause flush to create new files in the region
|
||||
admin.flush(tableName.toString());
|
||||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitTableSnapshotMapperJobConfig() throws Exception {
|
||||
setupCluster();
|
||||
|
@ -211,7 +148,7 @@ public class TestTableSnapshotInputFormat {
|
|||
String snapshotName = "foo";
|
||||
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, 1);
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
|
||||
Job job = new Job(UTIL.getConfiguration());
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
|
||||
|
@ -238,32 +175,23 @@ public class TestTableSnapshotInputFormat {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceSingleRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceMultiRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
|
||||
}
|
||||
|
||||
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
|
||||
int numRegions, int expectedNumSplits) throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
|
||||
try {
|
||||
createTableAndSnapshot(util, tableName, snapshotName, numRegions);
|
||||
createTableAndSnapshot(
|
||||
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
|
||||
|
||||
Job job = new Job(util.getConfiguration());
|
||||
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
|
||||
Scan scan = new Scan(bbb, yyy); // limit the scan
|
||||
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
|
||||
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, job, false, tmpTableDir);
|
||||
|
||||
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
|
||||
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
|
||||
|
||||
} finally {
|
||||
util.getHBaseAdmin().deleteSnapshot(snapshotName);
|
||||
|
@ -309,63 +237,21 @@ public class TestTableSnapshotInputFormat {
|
|||
rowTracker.validate();
|
||||
}
|
||||
|
||||
public static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
|
||||
throws IOException {
|
||||
byte[] row = key.get();
|
||||
CellScanner scanner = result.cellScanner();
|
||||
while (scanner.advance()) {
|
||||
Cell cell = scanner.current();
|
||||
|
||||
//assert that all Cells in the Result have the same key
|
||||
Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
|
||||
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
}
|
||||
|
||||
for (int j = 0; j < FAMILIES.length; j++) {
|
||||
byte[] actual = result.getValue(FAMILIES[j], null);
|
||||
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
|
||||
+ " ,actual:" + Bytes.toString(actual), row, actual);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMapReduceSingleRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMapReduceMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
// run the MR job while HBase is offline
|
||||
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
|
||||
}
|
||||
|
||||
private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
|
||||
int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
|
||||
setupCluster();
|
||||
util.startMiniMapReduceCluster();
|
||||
try {
|
||||
Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
|
||||
TableName tableName = TableName.valueOf("testWithMapReduce");
|
||||
doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
|
||||
expectedNumSplits, shutdownCluster);
|
||||
} finally {
|
||||
util.shutdownMiniMapReduceCluster();
|
||||
tearDownCluster();
|
||||
}
|
||||
@Override
|
||||
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
|
||||
boolean shutdownCluster) throws Exception {
|
||||
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
|
||||
numRegions, expectedNumSplits, shutdownCluster);
|
||||
}
|
||||
|
||||
// this is also called by the IntegrationTestTableSnapshotInputFormat
|
||||
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
|
||||
String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
|
||||
boolean shutdownCluster) throws Exception {
|
||||
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
|
||||
int expectedNumSplits, boolean shutdownCluster) throws Exception {
|
||||
|
||||
//create the table and snapshot
|
||||
createTableAndSnapshot(util, tableName, snapshotName, numRegions);
|
||||
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
|
||||
|
||||
if (shutdownCluster) {
|
||||
util.shutdownMiniHBaseCluster();
|
||||
|
@ -374,7 +260,7 @@ public class TestTableSnapshotInputFormat {
|
|||
try {
|
||||
// create the job
|
||||
Job job = new Job(util.getConfiguration());
|
||||
Scan scan = new Scan(bbb, yyy); // limit the scan
|
||||
Scan scan = new Scan(startRow, endRow); // limit the scan
|
||||
|
||||
job.setJarByClass(util.getClass());
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||
|
|
Loading…
Reference in New Issue