HBASE-13356 HBase should provide an InputFormat supporting multiple scans in mapreduce jobs over snapshots (Andrew Mains)

This commit is contained in:
tedyu 2015-06-02 19:36:17 -07:00
parent 41bfe40cf8
commit 39ab55841d
16 changed files with 1555 additions and 64 deletions

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
* .TableSnapshotInputFormat}
* allowing a MapReduce job to run over one or more table snapshots, with one or more scans
* configured for each.
* Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
* .TableSnapshotInputFormat}
* and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
* .TableSnapshotInputFormat} for
* more details.
* Usage is similar to TableSnapshotInputFormat, with the following exception:
* initMultiTableSnapshotMapperJob takes in a map
* from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
* scan will be applied;
* the overall dataset for the job is defined by the concatenation of the regions and tables
* included in each snapshot/scan
* pair.
* {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
* Class, Class, Class, JobConf, boolean, Path)}
* can be used to configure the job.
* <pre>{@code
* Job job = new Job(conf);
* Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
* "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
* "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
* );
* Path restoreDir = new Path("/tmp/snapshot_restore_dir")
* TableMapReduceUtil.initTableSnapshotMapperJob(
* snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
* MyMapOutputValueWritable.class, job, true, restoreDir);
* }
* </pre>
* Internally, this input format restores each snapshot into a subdirectory of the given tmp
* directory. Input splits and
* record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
* .TableSnapshotInputFormat}
* (one per region).
* See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
* permissioning; the
* same caveats apply here.
*
* @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
implements InputFormat<ImmutableBytesWritable, Result> {
private final MultiTableSnapshotInputFormatImpl delegate;
public MultiTableSnapshotInputFormat() {
this.delegate = new MultiTableSnapshotInputFormatImpl();
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
InputSplit[] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); i++) {
results[i] = new TableSnapshotRegionSplit(splits.get(i));
}
return results;
}
@Override
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
}
/**
* Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
* restoreDir.
* Sets: {@link org.apache.hadoop.hbase.mapreduce
* .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
* {@link org.apache.hadoop.hbase.mapreduce
* .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
*
* @param conf
* @param snapshotScans
* @param restoreDir
* @throws IOException
*/
public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
Path restoreDir) throws IOException {
new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -30,11 +32,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
@ -127,6 +128,40 @@ public class TableMapReduceUtil {
}
}
/**
* Sets up the job for reading from one or more multiple table snapshots, with one or more scans
* per snapshot.
* It bypasses hbase servers and read directly from snapshot files.
*
* @param snapshotScans map of snapshot name to scans on that snapshot.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
job.setInputFormat(MultiTableSnapshotInputFormat.class);
if (outputValueClass != null) {
job.setMapOutputValueClass(outputValueClass);
}
if (outputKeyClass != null) {
job.setMapOutputKeyClass(outputKeyClass);
}
job.setMapperClass(mapper);
if (addDependencyJars) {
addDependencyJars(job);
}
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
}
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.

View File

@ -18,12 +18,13 @@
package org.apache.hadoop.hbase.mapred;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.mapred.InputFormat;
@ -60,8 +61,9 @@ public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWrita
}
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
List<String> locations) {
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
List<String> locations, Scan scan, Path restoreDir) {
this.delegate =
new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* MultiTableSnapshotInputFormat generalizes
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
* allowing a MapReduce job to run over one or more table snapshots, with one or more scans
* configured for each.
* Internally, the input format delegates to
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
* and thus has the same performance advantages;
* see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
* more details.
* Usage is similar to TableSnapshotInputFormat, with the following exception:
* initMultiTableSnapshotMapperJob takes in a map
* from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
* scan will be applied;
* the overall dataset for the job is defined by the concatenation of the regions and tables
* included in each snapshot/scan
* pair.
* {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
* (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
* .hadoop.fs.Path)}
* can be used to configure the job.
* <pre>{@code
* Job job = new Job(conf);
* Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
* "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
* "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
* );
* Path restoreDir = new Path("/tmp/snapshot_restore_dir")
* TableMapReduceUtil.initTableSnapshotMapperJob(
* snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
* MyMapOutputValueWritable.class, job, true, restoreDir);
* }
* </pre>
* Internally, this input format restores each snapshot into a subdirectory of the given tmp
* directory. Input splits and
* record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
* .TableSnapshotInputFormat}
* (one per region).
* See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
* permissioning; the
* same caveats apply here.
*
* @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
private final MultiTableSnapshotInputFormatImpl delegate;
public MultiTableSnapshotInputFormat() {
this.delegate = new MultiTableSnapshotInputFormatImpl();
}
@Override
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
List<TableSnapshotInputFormatImpl.InputSplit> splits =
delegate.getSplits(jobContext.getConfiguration());
List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
}
return rtn;
}
public static void setInput(Configuration configuration,
Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.ConfigurationUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Shared implementation of mapreduce code over multiple table snapshots.
* Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
* .MultiTableSnapshotInputFormat} and mapred
* ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
*/
@InterfaceAudience.LimitedPrivate({ "HBase" })
@InterfaceStability.Evolving
public class MultiTableSnapshotInputFormatImpl {
private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
public static final String RESTORE_DIRS_KEY =
"hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
public static final String SNAPSHOT_TO_SCANS_KEY =
"hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
/**
* Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
* restoreDir.
* Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
*
* @param conf
* @param snapshotScans
* @param restoreDir
* @throws IOException
*/
public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
Path restoreDir) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
setSnapshotToScans(conf, snapshotScans);
Map<String, Path> restoreDirs =
generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
setSnapshotDirs(conf, restoreDirs);
restoreSnapshots(conf, restoreDirs, fs);
}
/**
* Return the list of splits extracted from the scans/snapshots pushed to conf by
* {@link
* #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
*
* @param conf Configuration to determine splits from
* @return Return the list of splits extracted from the scans/snapshots pushed to conf
* @throws IOException
*/
public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
String snapshotName = entry.getKey();
Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
SnapshotManifest manifest =
TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
List<HRegionInfo> regionInfos =
TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
for (Scan scan : entry.getValue()) {
List<TableSnapshotInputFormatImpl.InputSplit> splits =
TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
rtn.addAll(splits);
}
}
return rtn;
}
/**
* Retrieve the snapshot name -> list<scan> mapping pushed to configuration by
* {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
*
* @param conf Configuration to extract name -> list<scan> mappings from.
* @return the snapshot name -> list<scan> mapping pushed to configuration
* @throws IOException
*/
public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
Map<String, Collection<Scan>> rtn = Maps.newHashMap();
for (Map.Entry<String, String> entry : ConfigurationUtil
.getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
String snapshotName = entry.getKey();
String scan = entry.getValue();
Collection<Scan> snapshotScans = rtn.get(snapshotName);
if (snapshotScans == null) {
snapshotScans = Lists.newArrayList();
rtn.put(snapshotName, snapshotScans);
}
snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
}
return rtn;
}
/**
* Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
*
* @param conf
* @param snapshotScans
* @throws IOException
*/
public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
throws IOException {
// flatten out snapshotScans for serialization to the job conf
List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
String snapshotName = entry.getKey();
Collection<Scan> scans = entry.getValue();
// serialize all scans and map them to the appropriate snapshot
for (Scan scan : scans) {
snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<String, String>(snapshotName,
TableMapReduceUtil.convertScanToString(scan)));
}
}
ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
}
/**
* Retrieve the directories into which snapshots have been restored from
* ({@link #RESTORE_DIRS_KEY})
*
* @param conf Configuration to extract restore directories from
* @return the directories into which snapshots have been restored from
* @throws IOException
*/
public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
for (Map.Entry<String, String> kvp : kvps) {
rtn.put(kvp.getKey(), new Path(kvp.getValue()));
}
return rtn;
}
public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
Map<String, String> toSet = Maps.newHashMap();
for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
toSet.put(entry.getKey(), entry.getValue().toString());
}
ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
}
/**
* Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
* return a map from the snapshot to the restore directory.
*
* @param snapshots collection of snapshot names to restore
* @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
* @return a mapping from snapshot name to the directory in which that snapshot has been restored
*/
private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
Path baseRestoreDir) {
Map<String, Path> rtn = Maps.newHashMap();
for (String snapshotName : snapshots) {
Path restoreSnapshotDir =
new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
rtn.put(snapshotName, restoreSnapshotDir);
}
return rtn;
}
/**
* Restore each (snapshot name, restore directory) pair in snapshotToDir
*
* @param conf configuration to restore with
* @param snapshotToDir mapping from snapshot names to restore directories
* @param fs filesystem to do snapshot restoration on
* @throws IOException
*/
public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
throws IOException {
// TODO: restore from record readers to parallelize.
Path rootDir = FSUtils.getRootDir(conf);
for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
String snapshotName = entry.getKey();
Path restoreDir = entry.getValue();
LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
+ " for MultiTableSnapshotInputFormat");
restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
}
}
void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
FileSystem fs) throws IOException {
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
}
// TODO: these probably belong elsewhere/may already be implemented elsewhere.
}

View File

@ -22,13 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@ -305,6 +299,43 @@ public class TableMapReduceUtil {
conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
}
/**
* Sets up the job for reading from one or more table snapshots, with one or more scans
* per snapshot.
* It bypasses hbase servers and read directly from snapshot files.
*
* @param snapshotScans map of snapshot name to scans on that snapshot.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
if (outputValueClass != null) {
job.setMapOutputValueClass(outputValueClass);
}
if (outputKeyClass != null) {
job.setMapOutputKeyClass(outputKeyClass);
}
job.setMapperClass(mapper);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
if (addDependencyJars) {
addDependencyJars(job);
}
resetCacheConfig(job.getConfiguration());
}
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.

View File

@ -18,19 +18,14 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
@ -41,7 +36,12 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
/**
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
@ -98,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
}
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
List<String> locations) {
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
List<String> locations, Scan scan, Path restoreDir) {
this.delegate =
new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Result;
@ -61,9 +64,11 @@ public class TableSnapshotInputFormatImpl {
// TODO: Snapshots files are owned in fs by the hbase user. There is no
// easy way to delegate access.
public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
// key for specifying the root dir of the restored snapshot
private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER =
@ -74,14 +79,18 @@ public class TableSnapshotInputFormatImpl {
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
private HTableDescriptor htd;
private HRegionInfo regionInfo;
private String[] locations;
private String scan;
private String restoreDir;
// constructor for mapreduce framework / Writable
public InputSplit() {}
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
Scan scan, Path restoreDir) {
this.htd = htd;
this.regionInfo = regionInfo;
if (locations == null || locations.isEmpty()) {
@ -89,6 +98,25 @@ public class TableSnapshotInputFormatImpl {
} else {
this.locations = locations.toArray(new String[locations.size()]);
}
try {
this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
} catch (IOException e) {
LOG.warn("Failed to convert Scan to String", e);
}
this.restoreDir = restoreDir.toString();
}
public HTableDescriptor getHtd() {
return htd;
}
public String getScan() {
return scan;
}
public String getRestoreDir() {
return restoreDir;
}
public long getLength() {
@ -128,6 +156,10 @@ public class TableSnapshotInputFormatImpl {
byte[] buf = baos.toByteArray();
out.writeInt(buf.length);
out.write(buf);
Bytes.writeByteArray(out, Bytes.toBytes(scan));
Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
}
@Override
@ -140,6 +172,9 @@ public class TableSnapshotInputFormatImpl {
this.regionInfo = HRegionInfo.convert(split.getRegion());
List<String> locationsList = split.getLocationsList();
this.locations = locationsList.toArray(new String[locationsList.size()]);
this.scan = Bytes.toString(Bytes.readByteArray(in));
this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
}
}
@ -158,28 +193,12 @@ public class TableSnapshotInputFormatImpl {
}
public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
HTableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
// directory where snapshot was restored
// create scan
// TODO: mapred does not support scan as input API. Work around for now.
if (conf.get(TableInputFormat.SCAN) != null) {
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
} else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
String[] columns =
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
scan = new Scan();
for (String col : columns) {
scan.addFamily(Bytes.toBytes(col));
}
} else {
throw new IllegalArgumentException("A Scan is not configured for this job");
}
// region is immutable, this should be fine,
// otherwise we have to set the thread read point
@ -187,7 +206,8 @@ public class TableSnapshotInputFormatImpl {
// disable caching of data blocks
scan.setCacheBlocks(false);
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
scanner =
new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
}
public boolean nextKeyValue() throws IOException {
@ -233,18 +253,40 @@ public class TableSnapshotInputFormatImpl {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
// TODO: mapred does not support scan as input API. Work around for now.
Scan scan = extractScanFromConf(conf);
// the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
return getSplits(scan, manifest, regionInfos, restoreDir, conf);
}
public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
if (regionManifests == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
// load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor();
List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
// TODO: mapred does not support scan as input API. Work around for now.
for (SnapshotRegionManifest regionManifest : regionManifests) {
regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
}
return regionInfos;
}
public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
Path rootDir, FileSystem fs) throws IOException {
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
}
public static Scan extractScanFromConf(Configuration conf) throws IOException {
Scan scan = null;
if (conf.get(TableInputFormat.SCAN) != null) {
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
@ -258,17 +300,22 @@ public class TableSnapshotInputFormatImpl {
} else {
throw new IllegalArgumentException("Unable to create scan");
}
// the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
return scan;
}
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
// load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor();
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
List<InputSplit> splits = new ArrayList<InputSplit>();
for (SnapshotRegionManifest regionManifest : regionManifests) {
for (HRegionInfo hri : regionManifests) {
// load region descriptor
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) {
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List<String> hosts = getBestLocations(conf,
@ -276,11 +323,12 @@ public class TableSnapshotInputFormatImpl {
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
splits.add(new InputSplit(htd, hri, hosts));
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
return splits;
}
/**
@ -335,7 +383,7 @@ public class TableSnapshotInputFormatImpl {
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configuration
* @param conf the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Utilities for storing more complex collection types in
* {@link org.apache.hadoop.conf.Configuration} instances.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class ConfigurationUtil {
// TODO: hopefully this is a good delimiter; it's not in the base64 alphabet,
// nor is it valid for paths
public static final char KVP_DELIMITER = '^';
// Disallow instantiation
private ConfigurationUtil() {
}
/**
* Store a collection of Map.Entry's in conf, with each entry separated by ','
* and key values delimited by {@link #KVP_DELIMITER}
*
* @param conf configuration to store the collection in
* @param key overall key to store keyValues under
* @param keyValues kvps to be stored under key in conf
*/
public static void setKeyValues(Configuration conf, String key,
Collection<Map.Entry<String, String>> keyValues) {
setKeyValues(conf, key, keyValues, KVP_DELIMITER);
}
/**
* Store a collection of Map.Entry's in conf, with each entry separated by ','
* and key values delimited by delimiter.
*
* @param conf configuration to store the collection in
* @param key overall key to store keyValues under
* @param keyValues kvps to be stored under key in conf
* @param delimiter character used to separate each kvp
*/
public static void setKeyValues(Configuration conf, String key,
Collection<Map.Entry<String, String>> keyValues, char delimiter) {
List<String> serializedKvps = Lists.newArrayList();
for (Map.Entry<String, String> kvp : keyValues) {
serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue());
}
conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()]));
}
/**
* Retrieve a list of key value pairs from configuration, stored under the provided key
*
* @param conf configuration to retrieve kvps from
* @param key key under which the key values are stored
* @return the list of kvps stored under key in conf, or null if the key isn't present.
* @see #setKeyValues(Configuration, String, Collection, char)
*/
public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key) {
return getKeyValues(conf, key, KVP_DELIMITER);
}
/**
* Retrieve a list of key value pairs from configuration, stored under the provided key
*
* @param conf configuration to retrieve kvps from
* @param key key under which the key values are stored
* @param delimiter character used to separate each kvp
* @return the list of kvps stored under key in conf, or null if the key isn't present.
* @see #setKeyValues(Configuration, String, Collection, char)
*/
public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key,
char delimiter) {
String[] kvps = conf.getStrings(key);
if (kvps == null) {
return null;
}
List<Map.Entry<String, String>> rtn = Lists.newArrayList();
for (String kvp : kvps) {
String[] splitKvp = StringUtils.split(kvp, delimiter);
if (splitKvp.length != 2) {
throw new IllegalArgumentException(
"Expected key value pair for configuration key '" + key + "'" + " to be of form '<key>"
+ delimiter + "<value>; was " + kvp + " instead");
}
rtn.add(new AbstractMap.SimpleImmutableEntry<String, String>(splitKvp[0], splitKvp[1]));
}
return rtn;
}
}

View File

@ -131,7 +131,7 @@ import static org.junit.Assert.fail;
* Create an instance and keep it around testing HBase. This class is
* meant to be your one-stop shop for anything you might need testing. Manages
* one cluster at a time only. Managed cluster can be an in-process
* {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}.
* {@link MiniHBaseCluster}, or a deployed cluster of type {@link HBaseCluster}.
* Not all methods work with the real cluster.
* Depends on log4j being on classpath and
* hbase-site.xml for logging and test-run configuration. It does not set

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertTrue;
@Category({ LargeTests.class })
public class TestMultiTableSnapshotInputFormat
extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
@Override
protected void runJob(String jobName, Configuration c, List<Scan> scans)
throws IOException, InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(TEST_UTIL.getConfiguration());
job.setJobName(jobName);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
TableMapReduceUtil.addDependencyJars(job);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(1); // one to get final "first" and "last" key
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
LOG.info("Started " + job.getJobName());
RunningJob runningJob = JobClient.runJob(job);
runningJob.waitForCompletion();
assertTrue(runningJob.isSuccessful());
LOG.info("After map/reduce completion - job " + jobName);
}
public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
Reporter reporter) throws IOException {
makeAssertions(key, value);
outputCollector.collect(key, key);
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
}
@Override
public void configure(JobConf jobConf) {
}
}
public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
NullWritable, NullWritable> {
private JobConf jobConf;
@Override
public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
throws IOException {
makeAssertions(key, Lists.newArrayList(values));
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
super.cleanup(this.jobConf);
}
@Override
public void configure(JobConf jobConf) {
this.jobConf = jobConf;
}
}
}

View File

@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Base set of tests and setup for input formats touching multiple tables.
*/
public abstract class MultiTableInputFormatTestBase {
static final Log LOG = LogFactory.getLog(MultiTableInputFormatTestBase.class);
public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static final String TABLE_NAME = "scantest";
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final String KEY_STARTROW = "startRow";
static final String KEY_LASTROW = "stpRow";
static List<String> TABLES = Lists.newArrayList();
static {
for (int i = 0; i < 3; i++) {
TABLES.add(TABLE_NAME + String.valueOf(i));
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// switch TIF to log at DEBUG level
TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
// start mini hbase cluster
TEST_UTIL.startMiniCluster(3);
// create and fill table
for (String tableName : TABLES) {
HTable table = null;
try {
table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4);
TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
} finally {
if (table != null) {
table.close();
}
}
}
// start MR cluster
TEST_UTIL.startMiniMapReduceCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniMapReduceCluster();
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
}
/**
* Pass the key and value to reducer.
*/
public static class ScanMapper extends
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
/**
* Pass the key and value to reduce.
*
* @param key The key, here "aaa", "aab" etc.
* @param value The value is the same as the key.
* @param context The task context.
* @throws IOException When reading the rows fails.
*/
@Override
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
makeAssertions(key, value);
context.write(key, key);
}
public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
if (value.size() != 1) {
throw new IOException("There should only be one input column");
}
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
value.getMap();
if (!cf.containsKey(INPUT_FAMILY)) {
throw new IOException("Wrong input columns. Missing: '" +
Bytes.toString(INPUT_FAMILY) + "'.");
}
String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
", value -> " + val);
}
}
/**
* Checks the last and first keys seen against the scanner boundaries.
*/
public static class ScanReducer
extends
Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
NullWritable, NullWritable> {
private String first = null;
private String last = null;
@Override
protected void reduce(ImmutableBytesWritable key,
Iterable<ImmutableBytesWritable> values, Context context)
throws IOException, InterruptedException {
makeAssertions(key, values);
}
protected void makeAssertions(ImmutableBytesWritable key,
Iterable<ImmutableBytesWritable> values) {
int count = 0;
for (ImmutableBytesWritable value : values) {
String val = Bytes.toStringBinary(value.get());
LOG.debug("reduce: key[" + count + "] -> " +
Bytes.toStringBinary(key.get()) + ", value -> " + val);
if (first == null) first = val;
last = val;
count++;
}
assertEquals(3, count);
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Configuration c = context.getConfiguration();
cleanup(c);
}
protected void cleanup(Configuration c) {
String startRow = c.get(KEY_STARTROW);
String lastRow = c.get(KEY_LASTROW);
LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
startRow + "\"");
LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
"\"");
if (startRow != null && startRow.length() > 0) {
assertEquals(startRow, first);
}
if (lastRow != null && lastRow.length() > 0) {
assertEquals(lastRow, last);
}
}
}
@Test
public void testScanEmptyToEmpty() throws IOException, InterruptedException,
ClassNotFoundException {
testScan(null, null, null);
}
@Test
public void testScanEmptyToAPP() throws IOException, InterruptedException,
ClassNotFoundException {
testScan(null, "app", "apo");
}
@Test
public void testScanOBBToOPP() throws IOException, InterruptedException,
ClassNotFoundException {
testScan("obb", "opp", "opo");
}
@Test
public void testScanYZYToEmpty() throws IOException, InterruptedException,
ClassNotFoundException {
testScan("yzy", null, "zzz");
}
/**
* Tests a MR scan using specific start and stop rows.
*
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
private void testScan(String start, String stop, String last)
throws IOException, InterruptedException, ClassNotFoundException {
String jobName =
"Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
(stop != null ? stop.toUpperCase() : "Empty");
LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.set(KEY_STARTROW, start != null ? start : "");
c.set(KEY_LASTROW, last != null ? last : "");
List<Scan> scans = new ArrayList<Scan>();
for (String tableName : TABLES) {
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
if (start != null) {
scan.setStartRow(Bytes.toBytes(start));
}
if (stop != null) {
scan.setStopRow(Bytes.toBytes(stop));
}
scans.add(scan);
LOG.info("scan before: " + scan);
}
runJob(jobName, c, scans);
}
protected void runJob(String jobName, Configuration c, List<Scan> scans)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(c, jobName);
initJob(scans, job);
job.setReducerClass(ScanReducer.class);
job.setNumReduceTasks(1); // one to get final "first" and "last" key
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
LOG.info("Started " + job.getJobName());
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
LOG.info("After map/reduce completion - job " + jobName);
}
protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@Category({ LargeTests.class })
public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
protected Path restoreDir;
@BeforeClass
public static void setUpSnapshots() throws Exception {
TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
// take a snapshot of every table we have.
for (String tableName : TABLES) {
SnapshotTestingUtils
.createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName),
ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null,
snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
TEST_UTIL.getTestFileSystem(), true);
}
}
@Before
public void setUp() throws Exception {
this.restoreDir = new Path("/tmp");
}
@Override
protected void initJob(List<Scan> scans, Job job) throws IOException {
TableMapReduceUtil
.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
}
protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) {
return Multimaps.index(scans, new Function<Scan, String>() {
@Nullable
@Override
public String apply(Scan input) {
return snapshotNameForTable(
Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
}
}).asMap();
}
public static String snapshotNameForTable(String tableName) {
return tableName + "_snapshot";
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
@Category({ SmallTests.class })
public class TestMultiTableSnapshotInputFormatImpl {
private MultiTableSnapshotInputFormatImpl subject;
private Map<String, Collection<Scan>> snapshotScans;
private Path restoreDir;
private Configuration conf;
private Path rootDir;
@Before
public void setUp() throws Exception {
this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
// mock out restoreSnapshot
// TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper
// dependency into the
// input format. However, we need a new RestoreSnapshotHelper per snapshot in the current
// design, and it *also*
// feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would
// probably be the more "pure"
// way of doing things. This is the lesser of two evils, perhaps?
doNothing().when(this.subject).
restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class),
any(Path.class), any(FileSystem.class));
this.conf = new Configuration();
this.rootDir = new Path("file:///test-root-dir");
FSUtils.setRootDir(conf, rootDir);
this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1",
ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2",
ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))));
this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
}
public void callSetInput() throws IOException {
subject.setInput(this.conf, snapshotScans, restoreDir);
}
public Map<String, Collection<ScanWithEquals>> toScanWithEquals(
Map<String, Collection<Scan>> snapshotScans) throws IOException {
Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap();
for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
List<ScanWithEquals> scans = Lists.newArrayList();
for (Scan scan : entry.getValue()) {
scans.add(new ScanWithEquals(scan));
}
rtn.put(entry.getKey(), scans);
}
return rtn;
}
public static class ScanWithEquals {
private final String startRow;
private final String stopRow;
/**
* Creates a new instance of this class while copying all values.
*
* @param scan The scan instance to copy from.
* @throws java.io.IOException When copying the values fails.
*/
public ScanWithEquals(Scan scan) throws IOException {
this.startRow = Bytes.toStringBinary(scan.getStartRow());
this.stopRow = Bytes.toStringBinary(scan.getStopRow());
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ScanWithEquals)) {
return false;
}
ScanWithEquals otherScan = (ScanWithEquals) obj;
return Objects.equals(this.startRow, otherScan.startRow) && Objects
.equals(this.stopRow, otherScan.stopRow);
}
@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow)
.add("stopRow", stopRow).toString();
}
}
@Test
public void testSetInputSetsSnapshotToScans() throws Exception {
callSetInput();
Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf);
// convert to scans we can use .equals on
Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual);
Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans);
assertEquals(expectedWithEquals, actualWithEquals);
}
@Test
public void testSetInputPushesRestoreDirectories() throws Exception {
callSetInput();
Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
}
@Test
public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
callSetInput();
Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
for (Path snapshotDir : restoreDirs.values()) {
assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir,
snapshotDir.getParent());
}
}
@Test
public void testSetInputRestoresSnapshots() throws Exception {
callSetInput();
Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf);
for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir),
eq(entry.getValue()), any(FileSystem.class));
}
}
}

View File

@ -112,6 +112,6 @@ public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase {
@Test
public void testScanFromConfiguration()
throws IOException, InterruptedException, ClassNotFoundException {
testScanFromConfiguration("bba", "bbd", "bbc");
testScan("bba", "bbd", "bbc");
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@Category({ SmallTests.class })
public class TestConfigurationUtil {
private Configuration conf;
private Map<String, String> keyValues;
private String key;
@Before
public void setUp() throws Exception {
this.conf = new Configuration();
this.keyValues = ImmutableMap.of("k1", "v1", "k2", "v2");
this.key = "my_conf_key";
}
public void callSetKeyValues() {
ConfigurationUtil.setKeyValues(conf, key, keyValues.entrySet());
}
public List<Map.Entry<String, String>> callGetKeyValues() {
return ConfigurationUtil.getKeyValues(conf, key);
}
@Test
public void testGetAndSetKeyValuesWithValues() throws Exception {
callSetKeyValues();
assertEquals(Lists.newArrayList(this.keyValues.entrySet()), callGetKeyValues());
}
@Test
public void testGetKeyValuesWithUnsetKey() throws Exception {
assertNull(callGetKeyValues());
}
}