From 722fd17069a302f4de12c22212d54d80bed81aed Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 2 Jun 2015 19:29:15 -0700 Subject: [PATCH] HBASE-13356 HBase should provide an InputFormat supporting multiple scans in mapreduce jobs over snapshots (Andrew Mains) --- .../mapred/MultiTableSnapshotInputFormat.java | 130 +++++++++ .../hbase/mapred/TableMapReduceUtil.java | 53 +++- .../mapred/TableSnapshotInputFormat.java | 10 +- .../MultiTableSnapshotInputFormat.java | 108 +++++++ .../MultiTableSnapshotInputFormatImpl.java | 252 ++++++++++++++++ .../hbase/mapreduce/TableMapReduceUtil.java | 69 ++++- .../mapreduce/TableSnapshotInputFormat.java | 25 +- .../TableSnapshotInputFormatImpl.java | 120 +++++--- .../hadoop/hbase/util/ConfigurationUtil.java | 125 ++++++++ .../TestMultiTableSnapshotInputFormat.java | 135 +++++++++ .../MultiTableInputFormatTestBase.java | 274 ++++++++++++++++++ .../mapreduce/TestMultiTableInputFormat.java | 216 +------------- .../TestMultiTableSnapshotInputFormat.java | 93 ++++++ ...TestMultiTableSnapshotInputFormatImpl.java | 185 ++++++++++++ .../hbase/util/TestConfigurationUtil.java | 68 +++++ 15 files changed, 1576 insertions(+), 287 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java new file mode 100644 index 00000000000..ab27edd605c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred + * .TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans + * configured for each. + * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} for + * more details. + * Usage is similar to TableSnapshotInputFormat, with the following exception: + * initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding + * scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables + * included in each snapshot/scan + * pair. + * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, + * Class, Class, Class, JobConf, boolean, Path)} + * can be used to configure the job. + *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp + * directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * (one per region). + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on + * permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat + implements InputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List splits = delegate.getSplits(job); + InputSplit[] results = new InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); i++) { + results[i] = new TableSnapshotRegionSplit(splits.get(i)); + } + return results; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); + } + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of + * restoreDir. + * Sets: {@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, + * {@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} + * + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public static void setInput(Configuration conf, Map> snapshotScans, + Path restoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index b5fefbb7413..84a279d974e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -18,37 +18,32 @@ */ package org.apache.hadoop.hbase.mapred; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.security.token.Token; -import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; /** * Utility for {@link TableMap} and {@link TableReduce} @@ -127,6 +122,40 @@ public class TableMapReduceUtil { } } + /** + * Sets up the job for reading from one or more multiple table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, Class outputKeyClass, Class outputValueClass, + JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); + + job.setInputFormat(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + if (addDependencyJars) { + addDependencyJars(job); + } + + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java index 1c5e4bd2af9..a5c62b2c8d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hbase.mapred; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.mapred.InputFormat; @@ -60,8 +61,9 @@ public class TableSnapshotInputFormat implements InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java new file mode 100644 index 00000000000..bd530c8e3b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes + * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans + * configured for each. + * Internally, the input format delegates to + * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * and thus has the same performance advantages; + * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for + * more details. + * Usage is similar to TableSnapshotInputFormat, with the following exception: + * initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding + * scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables + * included in each snapshot/scan + * pair. + * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob + * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache + * .hadoop.fs.Path)} + * can be used to configure the job. + *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp + * directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * (one per region). + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on + * permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + List splits = + delegate.getSplits(jobContext.getConfiguration()); + List rtn = Lists.newArrayListWithCapacity(splits.size()); + + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); + } + + return rtn; + } + + public static void setInput(Configuration configuration, + Map> snapshotScans, Path tmpRestoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java new file mode 100644 index 00000000000..e9ce5a3f0a2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.ConfigurationUtil; +import org.apache.hadoop.hbase.util.FSUtils; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Shared implementation of mapreduce code over multiple table snapshots. + * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormat} and mapred + * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. + */ +@InterfaceAudience.LimitedPrivate({ "HBase" }) +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormatImpl { + + private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class); + + public static final String RESTORE_DIRS_KEY = + "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; + public static final String SNAPSHOT_TO_SCANS_KEY = + "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of + * restoreDir. + * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} + * + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public void setInput(Configuration conf, Map> snapshotScans, + Path restoreDir) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + setSnapshotToScans(conf, snapshotScans); + Map restoreDirs = + generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); + setSnapshotDirs(conf, restoreDirs); + restoreSnapshots(conf, restoreDirs, fs); + } + + /** + * Return the list of splits extracted from the scans/snapshots pushed to conf by + * {@link + * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} + * + * @param conf Configuration to determine splits from + * @return Return the list of splits extracted from the scans/snapshots pushed to conf + * @throws IOException + */ + public List getSplits(Configuration conf) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + List rtn = Lists.newArrayList(); + + Map> snapshotsToScans = getSnapshotsToScans(conf); + Map snapshotsToRestoreDirs = getSnapshotDirs(conf); + for (Map.Entry> entry : snapshotsToScans.entrySet()) { + String snapshotName = entry.getKey(); + + Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); + + SnapshotManifest manifest = + TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); + List regionInfos = + TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); + + for (Scan scan : entry.getValue()) { + List splits = + TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); + rtn.addAll(splits); + } + } + return rtn; + } + + /** + * Retrieve the snapshot name -> list mapping pushed to configuration by + * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} + * + * @param conf Configuration to extract name -> list mappings from. + * @return the snapshot name -> list mapping pushed to configuration + * @throws IOException + */ + public Map> getSnapshotsToScans(Configuration conf) throws IOException { + + Map> rtn = Maps.newHashMap(); + + for (Map.Entry entry : ConfigurationUtil + .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { + String snapshotName = entry.getKey(); + String scan = entry.getValue(); + + Collection snapshotScans = rtn.get(snapshotName); + if (snapshotScans == null) { + snapshotScans = Lists.newArrayList(); + rtn.put(snapshotName, snapshotScans); + } + + snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); + } + + return rtn; + } + + /** + * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) + * + * @param conf + * @param snapshotScans + * @throws IOException + */ + public void setSnapshotToScans(Configuration conf, Map> snapshotScans) + throws IOException { + // flatten out snapshotScans for serialization to the job conf + List> snapshotToSerializedScans = Lists.newArrayList(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + String snapshotName = entry.getKey(); + Collection scans = entry.getValue(); + + // serialize all scans and map them to the appropriate snapshot + for (Scan scan : scans) { + snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, + TableMapReduceUtil.convertScanToString(scan))); + } + } + + ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); + } + + /** + * Retrieve the directories into which snapshots have been restored from + * ({@link #RESTORE_DIRS_KEY}) + * + * @param conf Configuration to extract restore directories from + * @return the directories into which snapshots have been restored from + * @throws IOException + */ + public Map getSnapshotDirs(Configuration conf) throws IOException { + List> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); + Map rtn = Maps.newHashMapWithExpectedSize(kvps.size()); + + for (Map.Entry kvp : kvps) { + rtn.put(kvp.getKey(), new Path(kvp.getValue())); + } + + return rtn; + } + + public void setSnapshotDirs(Configuration conf, Map snapshotDirs) { + Map toSet = Maps.newHashMap(); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + toSet.put(entry.getKey(), entry.getValue().toString()); + } + + ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); + } + + /** + * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and + * return a map from the snapshot to the restore directory. + * + * @param snapshots collection of snapshot names to restore + * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored + * @return a mapping from snapshot name to the directory in which that snapshot has been restored + */ + private Map generateSnapshotToRestoreDirMapping(Collection snapshots, + Path baseRestoreDir) { + Map rtn = Maps.newHashMap(); + + for (String snapshotName : snapshots) { + Path restoreSnapshotDir = + new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); + rtn.put(snapshotName, restoreSnapshotDir); + } + + return rtn; + } + + /** + * Restore each (snapshot name, restore directory) pair in snapshotToDir + * + * @param conf configuration to restore with + * @param snapshotToDir mapping from snapshot names to restore directories + * @param fs filesystem to do snapshot restoration on + * @throws IOException + */ + public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs) + throws IOException { + // TODO: restore from record readers to parallelize. + Path rootDir = FSUtils.getRootDir(conf); + + for (Map.Entry entry : snapshotToDir.entrySet()) { + String snapshotName = entry.getKey(); + Path restoreDir = entry.getValue(); + LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir + + " for MultiTableSnapshotInputFormat"); + restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); + } + } + + void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, + FileSystem fs) throws IOException { + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 7d793ab27bb..769c40bce32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -18,20 +18,8 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; - +import com.google.protobuf.InvalidProtocolBufferException; +import com.yammer.metrics.core.MetricsRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -62,6 +50,21 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + /** * Utility for {@link TableMapper} and {@link TableReducer} */ @@ -305,6 +308,44 @@ public class TableMapReduceUtil { conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); } + /** + * Sets up the job for reading from one or more table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, Class outputKeyClass, Class outputValueClass, + Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); + + job.setInputFormatClass(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + + if (addDependencyJars) { + addDependencyJars(job); + addDependencyJars(job.getConfiguration(), MetricsRegistry.class); + } + + resetCacheConfig(job.getConfiguration()); + } + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 44d88c5d7bf..c40396f140e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -18,19 +18,14 @@ package org.apache.hadoop.hbase.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; @@ -41,7 +36,12 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.google.common.annotations.VisibleForTesting; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; /** * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job @@ -98,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 8496868d03f..1dfbfd3c7d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.ClientSideRegionScanner; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Result; @@ -61,9 +64,11 @@ public class TableSnapshotInputFormatImpl { // TODO: Snapshots files are owned in fs by the hbase user. There is no // easy way to delegate access. + public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class); + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; // key for specifying the root dir of the restored snapshot - private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; + protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ private static final String LOCALITY_CUTOFF_MULTIPLIER = @@ -74,14 +79,18 @@ public class TableSnapshotInputFormatImpl { * Implementation class for InputSplit logic common between mapred and mapreduce. */ public static class InputSplit implements Writable { + private HTableDescriptor htd; private HRegionInfo regionInfo; private String[] locations; + private String scan; + private String restoreDir; // constructor for mapreduce framework / Writable public InputSplit() {} - public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) { + public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, + Scan scan, Path restoreDir) { this.htd = htd; this.regionInfo = regionInfo; if (locations == null || locations.isEmpty()) { @@ -89,6 +98,25 @@ public class TableSnapshotInputFormatImpl { } else { this.locations = locations.toArray(new String[locations.size()]); } + try { + this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; + } catch (IOException e) { + LOG.warn("Failed to convert Scan to String", e); + } + + this.restoreDir = restoreDir.toString(); + } + + public HTableDescriptor getHtd() { + return htd; + } + + public String getScan() { + return scan; + } + + public String getRestoreDir() { + return restoreDir; } public long getLength() { @@ -128,6 +156,10 @@ public class TableSnapshotInputFormatImpl { byte[] buf = baos.toByteArray(); out.writeInt(buf.length); out.write(buf); + + Bytes.writeByteArray(out, Bytes.toBytes(scan)); + Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + } @Override @@ -140,6 +172,9 @@ public class TableSnapshotInputFormatImpl { this.regionInfo = HRegionInfo.convert(split.getRegion()); List locationsList = split.getLocationsList(); this.locations = locationsList.toArray(new String[locationsList.size()]); + + this.scan = Bytes.toString(Bytes.readByteArray(in)); + this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); } } @@ -158,28 +193,12 @@ public class TableSnapshotInputFormatImpl { } public void initialize(InputSplit split, Configuration conf) throws IOException { + this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.split = split; HTableDescriptor htd = split.htd; HRegionInfo hri = this.split.getRegionInfo(); FileSystem fs = FSUtils.getCurrentFileSystem(conf); - Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root - // directory where snapshot was restored - - // create scan - // TODO: mapred does not support scan as input API. Work around for now. - if (conf.get(TableInputFormat.SCAN) != null) { - scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); - } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { - String[] columns = - conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); - scan = new Scan(); - for (String col : columns) { - scan.addFamily(Bytes.toBytes(col)); - } - } else { - throw new IllegalArgumentException("A Scan is not configured for this job"); - } // region is immutable, this should be fine, // otherwise we have to set the thread read point @@ -187,7 +206,8 @@ public class TableSnapshotInputFormatImpl { // disable caching of data blocks scan.setCacheBlocks(false); - scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); + scanner = + new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); } public boolean nextKeyValue() throws IOException { @@ -233,18 +253,40 @@ public class TableSnapshotInputFormatImpl { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); - Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); - SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); - SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); + + List regionInfos = getRegionInfosFromManifest(manifest); + + // TODO: mapred does not support scan as input API. Work around for now. + Scan scan = extractScanFromConf(conf); + // the temp dir where the snapshot is restored + Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + + return getSplits(scan, manifest, regionInfos, restoreDir, conf); + } + + public static List getRegionInfosFromManifest(SnapshotManifest manifest) { List regionManifests = manifest.getRegionManifests(); if (regionManifests == null) { throw new IllegalArgumentException("Snapshot seems empty"); } - // load table descriptor - HTableDescriptor htd = manifest.getTableDescriptor(); + List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); - // TODO: mapred does not support scan as input API. Work around for now. + for (SnapshotRegionManifest regionManifest : regionManifests) { + regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo())); + } + return regionInfos; + } + + public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, + Path rootDir, FileSystem fs) throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + } + + public static Scan extractScanFromConf(Configuration conf) throws IOException { Scan scan = null; if (conf.get(TableInputFormat.SCAN) != null) { scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); @@ -258,29 +300,35 @@ public class TableSnapshotInputFormatImpl { } else { throw new IllegalArgumentException("Unable to create scan"); } - // the temp dir where the snapshot is restored - Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + return scan; + } + + public static List getSplits(Scan scan, SnapshotManifest manifest, + List regionManifests, Path restoreDir, Configuration conf) throws IOException { + // load table descriptor + HTableDescriptor htd = manifest.getTableDescriptor(); + Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); List splits = new ArrayList(); - for (SnapshotRegionManifest regionManifest : regionManifests) { + for (HRegionInfo hri : regionManifests) { // load region descriptor - HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), + hri.getEndKey())) { // compute HDFS locations from snapshot files (which will get the locations for // referred hfiles) List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } return splits; + } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java new file mode 100644 index 00000000000..efb3170e07f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * Utilities for storing more complex collection types in + * {@link org.apache.hadoop.conf.Configuration} instances. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class ConfigurationUtil { + // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet, + // nor is it valid for paths + public static final char KVP_DELIMITER = '^'; + + // Disallow instantiation + private ConfigurationUtil() { + + } + + /** + * Store a collection of Map.Entry's in conf, with each entry separated by ',' + * and key values delimited by {@link #KVP_DELIMITER} + * + * @param conf configuration to store the collection in + * @param key overall key to store keyValues under + * @param keyValues kvps to be stored under key in conf + */ + public static void setKeyValues(Configuration conf, String key, + Collection> keyValues) { + setKeyValues(conf, key, keyValues, KVP_DELIMITER); + } + + /** + * Store a collection of Map.Entry's in conf, with each entry separated by ',' + * and key values delimited by delimiter. + * + * @param conf configuration to store the collection in + * @param key overall key to store keyValues under + * @param keyValues kvps to be stored under key in conf + * @param delimiter character used to separate each kvp + */ + public static void setKeyValues(Configuration conf, String key, + Collection> keyValues, char delimiter) { + List serializedKvps = Lists.newArrayList(); + + for (Map.Entry kvp : keyValues) { + serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue()); + } + + conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()])); + } + + /** + * Retrieve a list of key value pairs from configuration, stored under the provided key + * + * @param conf configuration to retrieve kvps from + * @param key key under which the key values are stored + * @return the list of kvps stored under key in conf, or null if the key isn't present. + * @see #setKeyValues(Configuration, String, Collection, char) + */ + public static List> getKeyValues(Configuration conf, String key) { + return getKeyValues(conf, key, KVP_DELIMITER); + } + + /** + * Retrieve a list of key value pairs from configuration, stored under the provided key + * + * @param conf configuration to retrieve kvps from + * @param key key under which the key values are stored + * @param delimiter character used to separate each kvp + * @return the list of kvps stored under key in conf, or null if the key isn't present. + * @see #setKeyValues(Configuration, String, Collection, char) + */ + public static List> getKeyValues(Configuration conf, String key, + char delimiter) { + String[] kvps = conf.getStrings(key); + + if (kvps == null) { + return null; + } + + List> rtn = Lists.newArrayList(); + + for (String kvp : kvps) { + String[] splitKvp = StringUtils.split(kvp, delimiter); + + if (splitKvp.length != 2) { + throw new IllegalArgumentException( + "Expected key value pair for configuration key '" + key + "'" + " to be of form '" + + delimiter + "; was " + kvp + " instead"); + } + + rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1])); + } + return rtn; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 00000000000..915a35d2387 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@Category({ VerySlowMapReduceTests.class, LargeTests.class }) +public class TestMultiTableSnapshotInputFormat + extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class); + + @Override + protected void runJob(String jobName, Configuration c, List scans) + throws IOException, InterruptedException, ClassNotFoundException { + JobConf job = new JobConf(TEST_UTIL.getConfiguration()); + + job.setJobName(jobName); + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + + TableMapReduceUtil.addDependencyJars(job); + + job.setReducerClass(Reducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + + RunningJob runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + assertTrue(runningJob.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper + implements TableMap { + + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector outputCollector, + Reporter reporter) throws IOException { + makeAssertions(key, value); + outputCollector.collect(key, key); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf jobConf) { + + } + } + + public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements + org.apache.hadoop.mapred.Reducer { + + private JobConf jobConf; + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector outputCollector, Reporter reporter) + throws IOException { + makeAssertions(key, Lists.newArrayList(values)); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + super.cleanup(this.jobConf); + } + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java new file mode 100644 index 00000000000..6129b265337 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base set of tests and setup for input formats touching multiple tables. + */ +public abstract class MultiTableInputFormatTestBase { + static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); + public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + static List TABLES = Lists.newArrayList(); + + static { + for (int i = 0; i < 3; i++) { + TABLES.add(TABLE_NAME + String.valueOf(i)); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (String tableName : TABLES) { + try (HTable table = + TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), + INPUT_FAMILY, 4)) { + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + } + } + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + makeAssertions(key, value); + context.write(key, key); + } + + public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer { + private String first = null; + private String last = null; + + @Override + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException, InterruptedException { + makeAssertions(key, values); + } + + protected void makeAssertions(ImmutableBytesWritable key, + Iterable values) { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(3, count); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + cleanup(c); + } + + protected void cleanup(Configuration c) { + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } + + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + List scans = new ArrayList(); + + for (String tableName : TABLES) { + Scan scan = new Scan(); + + scan.addFamily(INPUT_FAMILY); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); + + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + + scans.add(scan); + + LOG.info("scan before: " + scan); + } + + runJob(jobName, c, scans); + } + + protected void runJob(String jobName, Configuration c, List scans) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = new Job(c, jobName); + + initJob(scans, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + protected abstract void initJob(List scans, Job job) throws IOException; + + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java index f2827acfa35..1bfd14a280b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java @@ -17,236 +17,34 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.After; -import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.util.List; + /** * Tests various scan start and stop row scenarios. This is set in a scan and * tested in a MapReduce job to see if that is handed over and done properly * too. */ @Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestMultiTableInputFormat { - - private static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - static final String TABLE_NAME = "scantest"; - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final String KEY_STARTROW = "startRow"; - static final String KEY_LASTROW = "stpRow"; +public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase { @BeforeClass - public static void setUpBeforeClass() throws Exception { - // switch TIF to log at DEBUG level + public static void setupLogging() { TEST_UTIL.enableDebug(MultiTableInputFormat.class); - TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); - // start mini hbase cluster - TEST_UTIL.startMiniCluster(3); - // create and fill table - for (int i = 0; i < 3; i++) { - try (HTable table = - TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), - INPUT_FAMILY, 4)) { - TEST_UTIL.loadTable(table, INPUT_FAMILY, false); - } - } - // start MR cluster - TEST_UTIL.startMiniMapReduceCluster(); } - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); - TEST_UTIL.shutdownMiniCluster(); - } - - @After - public void tearDown() throws Exception { - Configuration c = TEST_UTIL.getConfiguration(); - FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); - } - - /** - * Pass the key and value to reducer. - */ - public static class ScanMapper extends - TableMapper { - /** - * Pass the key and value to reduce. - * - * @param key The key, here "aaa", "aab" etc. - * @param value The value is the same as the key. - * @param context The task context. - * @throws IOException When reading the rows fails. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> cf = - value.getMap(); - if (!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); - LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + - ", value -> " + val); - context.write(key, key); - } - } - - /** - * Checks the last and first keys seen against the scanner boundaries. - */ - public static class ScanReducer - extends - Reducer { - private String first = null; - private String last = null; - - @Override - protected void reduce(ImmutableBytesWritable key, - Iterable values, Context context) - throws IOException, InterruptedException { - int count = 0; - for (ImmutableBytesWritable value : values) { - String val = Bytes.toStringBinary(value.get()); - LOG.debug("reduce: key[" + count + "] -> " + - Bytes.toStringBinary(key.get()) + ", value -> " + val); - if (first == null) first = val; - last = val; - count++; - } - assertEquals(3, count); - } - - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - Configuration c = context.getConfiguration(); - String startRow = c.get(KEY_STARTROW); - String lastRow = c.get(KEY_LASTROW); - LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + - startRow + "\""); - LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + - "\""); - if (startRow != null && startRow.length() > 0) { - assertEquals(startRow, first); - } - if (lastRow != null && lastRow.length() > 0) { - assertEquals(lastRow, last); - } - } - } - - @Test - public void testScanEmptyToEmpty() throws IOException, InterruptedException, - ClassNotFoundException { - testScan(null, null, null); - } - - @Test - public void testScanEmptyToAPP() throws IOException, InterruptedException, - ClassNotFoundException { - testScan(null, "app", "apo"); - } - - @Test - public void testScanOBBToOPP() throws IOException, InterruptedException, - ClassNotFoundException { - testScan("obb", "opp", "opo"); - } - - @Test - public void testScanYZYToEmpty() throws IOException, InterruptedException, - ClassNotFoundException { - testScan("yzy", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScan(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = - "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + - (stop != null ? stop.toUpperCase() : "Empty"); - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - - c.set(KEY_STARTROW, start != null ? start : ""); - c.set(KEY_LASTROW, last != null ? last : ""); - - List scans = new ArrayList(); - - for(int i=0; i<3; i++){ - Scan scan = new Scan(); - - scan.addFamily(INPUT_FAMILY); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i)); - - if (start != null) { - scan.setStartRow(Bytes.toBytes(start)); - } - if (stop != null) { - scan.setStopRow(Bytes.toBytes(stop)); - } - - scans.add(scan); - - LOG.info("scan before: " + scan); - } - - Job job = new Job(c, jobName); - + @Override + protected void initJob(List scans, Job job) throws IOException { TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - job.setReducerClass(ScanReducer.class); - job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - LOG.info("Started " + job.getJobName()); - job.waitForCompletion(true); - assertTrue(job.isSuccessful()); - LOG.info("After map/reduce completion - job " + jobName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 00000000000..6285ca18997 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimaps; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@Category({ VerySlowMapReduceTests.class, LargeTests.class }) +public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase { + + protected Path restoreDir; + + @BeforeClass + public static void setUpSnapshots() throws Exception { + + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class); + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class); + + // take a snapshot of every table we have. + for (String tableName : TABLES) { + SnapshotTestingUtils + .createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName), + ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null, + snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), + TEST_UTIL.getTestFileSystem(), true); + } + } + + @Before + public void setUp() throws Exception { + this.restoreDir = new Path("/tmp"); + + } + + @Override + protected void initJob(List scans, Job job) throws IOException { + TableMapReduceUtil + .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + } + + protected Map> getSnapshotScanMapping(final List scans) { + return Multimaps.index(scans, new Function() { + @Nullable + @Override + public String apply(Scan input) { + return snapshotNameForTable( + Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME))); + } + }).asMap(); + } + + public static String snapshotNameForTable(String tableName) { + return tableName + "_snapshot"; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java new file mode 100644 index 00000000000..b4b8056fe60 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; + +@Category({ SmallTests.class }) +public class TestMultiTableSnapshotInputFormatImpl { + + private MultiTableSnapshotInputFormatImpl subject; + private Map> snapshotScans; + private Path restoreDir; + private Configuration conf; + private Path rootDir; + + @Before + public void setUp() throws Exception { + this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl()); + + // mock out restoreSnapshot + // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper + // dependency into the + // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current + // design, and it *also* + // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would + // probably be the more "pure" + // way of doing things. This is the lesser of two evils, perhaps? + doNothing().when(this.subject). + restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), + any(Path.class), any(FileSystem.class)); + + this.conf = new Configuration(); + this.rootDir = new Path("file:///test-root-dir"); + FSUtils.setRootDir(conf, rootDir); + this.snapshotScans = ImmutableMap.>of("snapshot1", + ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2", + ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")), + new Scan(Bytes.toBytes("5"), Bytes.toBytes("6")))); + + this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir"); + + } + + public void callSetInput() throws IOException { + subject.setInput(this.conf, snapshotScans, restoreDir); + } + + public Map> toScanWithEquals( + Map> snapshotScans) throws IOException { + Map> rtn = Maps.newHashMap(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + List scans = Lists.newArrayList(); + + for (Scan scan : entry.getValue()) { + scans.add(new ScanWithEquals(scan)); + } + rtn.put(entry.getKey(), scans); + } + + return rtn; + } + + public static class ScanWithEquals { + + private final String startRow; + private final String stopRow; + + /** + * Creates a new instance of this class while copying all values. + * + * @param scan The scan instance to copy from. + * @throws java.io.IOException When copying the values fails. + */ + public ScanWithEquals(Scan scan) throws IOException { + this.startRow = Bytes.toStringBinary(scan.getStartRow()); + this.stopRow = Bytes.toStringBinary(scan.getStopRow()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ScanWithEquals)) { + return false; + } + ScanWithEquals otherScan = (ScanWithEquals) obj; + return Objects.equals(this.startRow, otherScan.startRow) && Objects + .equals(this.stopRow, otherScan.stopRow); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow) + .add("stopRow", stopRow).toString(); + } + } + + @Test + public void testSetInputSetsSnapshotToScans() throws Exception { + + callSetInput(); + + Map> actual = subject.getSnapshotsToScans(conf); + + // convert to scans we can use .equals on + Map> actualWithEquals = toScanWithEquals(actual); + Map> expectedWithEquals = toScanWithEquals(snapshotScans); + + assertEquals(expectedWithEquals, actualWithEquals); + } + + @Test + public void testSetInputPushesRestoreDirectories() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet()); + } + + @Test + public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + for (Path snapshotDir : restoreDirs.values()) { + assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, + snapshotDir.getParent()); + } + } + + @Test + public void testSetInputRestoresSnapshots() throws Exception { + callSetInput(); + + Map snapshotDirs = subject.getSnapshotDirs(conf); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), + eq(entry.getValue()), any(FileSystem.class)); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java new file mode 100644 index 00000000000..a9ecf9ed0ee --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +@Category({ SmallTests.class }) +public class TestConfigurationUtil { + + private Configuration conf; + private Map keyValues; + private String key; + + @Before + public void setUp() throws Exception { + this.conf = new Configuration(); + this.keyValues = ImmutableMap.of("k1", "v1", "k2", "v2"); + this.key = "my_conf_key"; + } + + public void callSetKeyValues() { + ConfigurationUtil.setKeyValues(conf, key, keyValues.entrySet()); + } + + public List> callGetKeyValues() { + return ConfigurationUtil.getKeyValues(conf, key); + } + + @Test + public void testGetAndSetKeyValuesWithValues() throws Exception { + callSetKeyValues(); + assertEquals(Lists.newArrayList(this.keyValues.entrySet()), callGetKeyValues()); + } + + @Test + public void testGetKeyValuesWithUnsetKey() throws Exception { + assertNull(callGetKeyValues()); + } + +}