diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
new file mode 100644
index 00000000000..ab27edd605c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
+ * .TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
+ * Class, Class, Class, JobConf, boolean, Path)}
+ * can be used to configure the job.
+ *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ *
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
+ implements InputFormat {
+
+ private final MultiTableSnapshotInputFormatImpl delegate;
+
+ public MultiTableSnapshotInputFormat() {
+ this.delegate = new MultiTableSnapshotInputFormatImpl();
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List splits = delegate.getSplits(job);
+ InputSplit[] results = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new TableSnapshotRegionSplit(splits.get(i));
+ }
+ return results;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+ }
+
+ /**
+ * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
+ * restoreDir.
+ * Sets: {@link org.apache.hadoop.hbase.mapreduce
+ * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
+ * {@link org.apache.hadoop.hbase.mapreduce
+ * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
+ *
+ * @param conf
+ * @param snapshotScans
+ * @param restoreDir
+ * @throws IOException
+ */
+ public static void setInput(Configuration conf, Map> snapshotScans,
+ Path restoreDir) throws IOException {
+ new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index b5fefbb7413..84a279d974e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -18,37 +18,32 @@
*/
package org.apache.hadoop.hbase.mapred;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.token.Token;
-import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
/**
* Utility for {@link TableMap} and {@link TableReduce}
@@ -127,6 +122,40 @@ public class TableMapReduceUtil {
}
}
+ /**
+ * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
+ * per snapshot.
+ * It bypasses hbase servers and read directly from snapshot files.
+ *
+ * @param snapshotScans map of snapshot name to scans on that snapshot.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initMultiTableSnapshotMapperJob(Map> snapshotScans,
+ Class extends TableMap> mapper, Class> outputKeyClass, Class> outputValueClass,
+ JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+ MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
+
+ job.setInputFormat(MultiTableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+ }
+
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
index 1c5e4bd2af9..a5c62b2c8d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -18,12 +18,13 @@
package org.apache.hadoop.hbase.mapred;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.mapred.InputFormat;
@@ -60,8 +61,9 @@ public class TableSnapshotInputFormat implements InputFormat locations) {
- this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ List locations, Scan scan, Path restoreDir) {
+ this.delegate =
+ new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
new file mode 100644
index 00000000000..bd530c8e3b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes
+ * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to
+ * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * and thus has the same performance advantages;
+ * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
+ * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
+ * .hadoop.fs.Path)}
+ * can be used to configure the job.
+ * {@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ *
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
+
+ private final MultiTableSnapshotInputFormatImpl delegate;
+
+ public MultiTableSnapshotInputFormat() {
+ this.delegate = new MultiTableSnapshotInputFormatImpl();
+ }
+
+ @Override
+ public List getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ List splits =
+ delegate.getSplits(jobContext.getConfiguration());
+ List rtn = Lists.newArrayListWithCapacity(splits.size());
+
+ for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+ rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
+ }
+
+ return rtn;
+ }
+
+ public static void setInput(Configuration configuration,
+ Map> snapshotScans, Path tmpRestoreDir) throws IOException {
+ new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 00000000000..e9ce5a3f0a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.ConfigurationUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Shared implementation of mapreduce code over multiple table snapshots.
+ * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
+ * .MultiTableSnapshotInputFormat} and mapred
+ * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
+ */
+@InterfaceAudience.LimitedPrivate({ "HBase" })
+@InterfaceStability.Evolving
+public class MultiTableSnapshotInputFormatImpl {
+
+ private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
+
+ public static final String RESTORE_DIRS_KEY =
+ "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
+ public static final String SNAPSHOT_TO_SCANS_KEY =
+ "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
+
+ /**
+ * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
+ * restoreDir.
+ * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
+ *
+ * @param conf
+ * @param snapshotScans
+ * @param restoreDir
+ * @throws IOException
+ */
+ public void setInput(Configuration conf, Map> snapshotScans,
+ Path restoreDir) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ setSnapshotToScans(conf, snapshotScans);
+ Map restoreDirs =
+ generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
+ setSnapshotDirs(conf, restoreDirs);
+ restoreSnapshots(conf, restoreDirs, fs);
+ }
+
+ /**
+ * Return the list of splits extracted from the scans/snapshots pushed to conf by
+ * {@link
+ * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
+ *
+ * @param conf Configuration to determine splits from
+ * @return Return the list of splits extracted from the scans/snapshots pushed to conf
+ * @throws IOException
+ */
+ public List getSplits(Configuration conf)
+ throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ List rtn = Lists.newArrayList();
+
+ Map> snapshotsToScans = getSnapshotsToScans(conf);
+ Map snapshotsToRestoreDirs = getSnapshotDirs(conf);
+ for (Map.Entry> entry : snapshotsToScans.entrySet()) {
+ String snapshotName = entry.getKey();
+
+ Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
+
+ SnapshotManifest manifest =
+ TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
+ List regionInfos =
+ TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
+
+ for (Scan scan : entry.getValue()) {
+ List splits =
+ TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
+ rtn.addAll(splits);
+ }
+ }
+ return rtn;
+ }
+
+ /**
+ * Retrieve the snapshot name -> list mapping pushed to configuration by
+ * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
+ *
+ * @param conf Configuration to extract name -> list mappings from.
+ * @return the snapshot name -> list mapping pushed to configuration
+ * @throws IOException
+ */
+ public Map> getSnapshotsToScans(Configuration conf) throws IOException {
+
+ Map> rtn = Maps.newHashMap();
+
+ for (Map.Entry entry : ConfigurationUtil
+ .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
+ String snapshotName = entry.getKey();
+ String scan = entry.getValue();
+
+ Collection snapshotScans = rtn.get(snapshotName);
+ if (snapshotScans == null) {
+ snapshotScans = Lists.newArrayList();
+ rtn.put(snapshotName, snapshotScans);
+ }
+
+ snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
+ }
+
+ return rtn;
+ }
+
+ /**
+ * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
+ *
+ * @param conf
+ * @param snapshotScans
+ * @throws IOException
+ */
+ public void setSnapshotToScans(Configuration conf, Map> snapshotScans)
+ throws IOException {
+ // flatten out snapshotScans for serialization to the job conf
+ List> snapshotToSerializedScans = Lists.newArrayList();
+
+ for (Map.Entry> entry : snapshotScans.entrySet()) {
+ String snapshotName = entry.getKey();
+ Collection scans = entry.getValue();
+
+ // serialize all scans and map them to the appropriate snapshot
+ for (Scan scan : scans) {
+ snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
+ TableMapReduceUtil.convertScanToString(scan)));
+ }
+ }
+
+ ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
+ }
+
+ /**
+ * Retrieve the directories into which snapshots have been restored from
+ * ({@link #RESTORE_DIRS_KEY})
+ *
+ * @param conf Configuration to extract restore directories from
+ * @return the directories into which snapshots have been restored from
+ * @throws IOException
+ */
+ public Map getSnapshotDirs(Configuration conf) throws IOException {
+ List> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
+ Map rtn = Maps.newHashMapWithExpectedSize(kvps.size());
+
+ for (Map.Entry kvp : kvps) {
+ rtn.put(kvp.getKey(), new Path(kvp.getValue()));
+ }
+
+ return rtn;
+ }
+
+ public void setSnapshotDirs(Configuration conf, Map snapshotDirs) {
+ Map toSet = Maps.newHashMap();
+
+ for (Map.Entry entry : snapshotDirs.entrySet()) {
+ toSet.put(entry.getKey(), entry.getValue().toString());
+ }
+
+ ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
+ }
+
+ /**
+ * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
+ * return a map from the snapshot to the restore directory.
+ *
+ * @param snapshots collection of snapshot names to restore
+ * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
+ * @return a mapping from snapshot name to the directory in which that snapshot has been restored
+ */
+ private Map generateSnapshotToRestoreDirMapping(Collection snapshots,
+ Path baseRestoreDir) {
+ Map rtn = Maps.newHashMap();
+
+ for (String snapshotName : snapshots) {
+ Path restoreSnapshotDir =
+ new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
+ rtn.put(snapshotName, restoreSnapshotDir);
+ }
+
+ return rtn;
+ }
+
+ /**
+ * Restore each (snapshot name, restore directory) pair in snapshotToDir
+ *
+ * @param conf configuration to restore with
+ * @param snapshotToDir mapping from snapshot names to restore directories
+ * @param fs filesystem to do snapshot restoration on
+ * @throws IOException
+ */
+ public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs)
+ throws IOException {
+ // TODO: restore from record readers to parallelize.
+ Path rootDir = FSUtils.getRootDir(conf);
+
+ for (Map.Entry entry : snapshotToDir.entrySet()) {
+ String snapshotName = entry.getKey();
+ Path restoreDir = entry.getValue();
+ LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
+ + " for MultiTableSnapshotInputFormat");
+ restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
+ }
+ }
+
+ void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
+ FileSystem fs) throws IOException {
+ RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 7d793ab27bb..769c40bce32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -18,20 +18,8 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yammer.metrics.core.MetricsRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -62,6 +50,21 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
/**
* Utility for {@link TableMapper} and {@link TableReducer}
*/
@@ -305,6 +308,44 @@ public class TableMapReduceUtil {
conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
}
+ /**
+ * Sets up the job for reading from one or more table snapshots, with one or more scans
+ * per snapshot.
+ * It bypasses hbase servers and read directly from snapshot files.
+ *
+ * @param snapshotScans map of snapshot name to scans on that snapshot.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initMultiTableSnapshotMapperJob(Map> snapshotScans,
+ Class extends TableMapper> mapper, Class> outputKeyClass, Class> outputValueClass,
+ Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
+ MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
+
+ job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ addDependencyJars(job.getConfiguration(), MetricsRegistry.class);
+ }
+
+ resetCacheConfig(job.getConfiguration());
+ }
+
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 44d88c5d7bf..c40396f140e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -18,19 +18,14 @@
package org.apache.hadoop.hbase.mapreduce;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
@@ -41,7 +36,12 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
/**
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
@@ -98,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat locations) {
- this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ List locations, Scan scan, Path restoreDir) {
+ this.delegate =
+ new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 8496868d03f..1dfbfd3c7d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Result;
@@ -61,9 +64,11 @@ public class TableSnapshotInputFormatImpl {
// TODO: Snapshots files are owned in fs by the hbase user. There is no
// easy way to delegate access.
+ public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
+
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
// key for specifying the root dir of the restored snapshot
- private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
+ protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER =
@@ -74,14 +79,18 @@ public class TableSnapshotInputFormatImpl {
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
+
private HTableDescriptor htd;
private HRegionInfo regionInfo;
private String[] locations;
+ private String scan;
+ private String restoreDir;
// constructor for mapreduce framework / Writable
public InputSplit() {}
- public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) {
+ public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations,
+ Scan scan, Path restoreDir) {
this.htd = htd;
this.regionInfo = regionInfo;
if (locations == null || locations.isEmpty()) {
@@ -89,6 +98,25 @@ public class TableSnapshotInputFormatImpl {
} else {
this.locations = locations.toArray(new String[locations.size()]);
}
+ try {
+ this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
+ } catch (IOException e) {
+ LOG.warn("Failed to convert Scan to String", e);
+ }
+
+ this.restoreDir = restoreDir.toString();
+ }
+
+ public HTableDescriptor getHtd() {
+ return htd;
+ }
+
+ public String getScan() {
+ return scan;
+ }
+
+ public String getRestoreDir() {
+ return restoreDir;
}
public long getLength() {
@@ -128,6 +156,10 @@ public class TableSnapshotInputFormatImpl {
byte[] buf = baos.toByteArray();
out.writeInt(buf.length);
out.write(buf);
+
+ Bytes.writeByteArray(out, Bytes.toBytes(scan));
+ Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
+
}
@Override
@@ -140,6 +172,9 @@ public class TableSnapshotInputFormatImpl {
this.regionInfo = HRegionInfo.convert(split.getRegion());
List locationsList = split.getLocationsList();
this.locations = locationsList.toArray(new String[locationsList.size()]);
+
+ this.scan = Bytes.toString(Bytes.readByteArray(in));
+ this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
}
}
@@ -158,28 +193,12 @@ public class TableSnapshotInputFormatImpl {
}
public void initialize(InputSplit split, Configuration conf) throws IOException {
+ this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
HTableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
- Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
- // directory where snapshot was restored
-
- // create scan
- // TODO: mapred does not support scan as input API. Work around for now.
- if (conf.get(TableInputFormat.SCAN) != null) {
- scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
- } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
- String[] columns =
- conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
- scan = new Scan();
- for (String col : columns) {
- scan.addFamily(Bytes.toBytes(col));
- }
- } else {
- throw new IllegalArgumentException("A Scan is not configured for this job");
- }
// region is immutable, this should be fine,
// otherwise we have to set the thread read point
@@ -187,7 +206,8 @@ public class TableSnapshotInputFormatImpl {
// disable caching of data blocks
scan.setCacheBlocks(false);
- scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+ scanner =
+ new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
}
public boolean nextKeyValue() throws IOException {
@@ -233,18 +253,40 @@ public class TableSnapshotInputFormatImpl {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
- Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
- SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+ SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
+
+ List regionInfos = getRegionInfosFromManifest(manifest);
+
+ // TODO: mapred does not support scan as input API. Work around for now.
+ Scan scan = extractScanFromConf(conf);
+ // the temp dir where the snapshot is restored
+ Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+
+ return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+ }
+
+ public static List getRegionInfosFromManifest(SnapshotManifest manifest) {
List regionManifests = manifest.getRegionManifests();
if (regionManifests == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
- // load table descriptor
- HTableDescriptor htd = manifest.getTableDescriptor();
+ List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
- // TODO: mapred does not support scan as input API. Work around for now.
+ for (SnapshotRegionManifest regionManifest : regionManifests) {
+ regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
+ }
+ return regionInfos;
+ }
+
+ public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
+ Path rootDir, FileSystem fs) throws IOException {
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+ return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+ }
+
+ public static Scan extractScanFromConf(Configuration conf) throws IOException {
Scan scan = null;
if (conf.get(TableInputFormat.SCAN) != null) {
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
@@ -258,29 +300,35 @@ public class TableSnapshotInputFormatImpl {
} else {
throw new IllegalArgumentException("Unable to create scan");
}
- // the temp dir where the snapshot is restored
- Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+ return scan;
+ }
+
+ public static List getSplits(Scan scan, SnapshotManifest manifest,
+ List regionManifests, Path restoreDir, Configuration conf) throws IOException {
+ // load table descriptor
+ HTableDescriptor htd = manifest.getTableDescriptor();
+
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
List splits = new ArrayList();
- for (SnapshotRegionManifest regionManifest : regionManifests) {
+ for (HRegionInfo hri : regionManifests) {
// load region descriptor
- HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
- if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
- hri.getStartKey(), hri.getEndKey())) {
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
+ hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List hosts = getBestLocations(conf,
- HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
- splits.add(new InputSplit(htd, hri, hosts));
+ splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
return splits;
+
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java
new file mode 100644
index 00000000000..efb3170e07f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utilities for storing more complex collection types in
+ * {@link org.apache.hadoop.conf.Configuration} instances.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class ConfigurationUtil {
+ // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet,
+ // nor is it valid for paths
+ public static final char KVP_DELIMITER = '^';
+
+ // Disallow instantiation
+ private ConfigurationUtil() {
+
+ }
+
+ /**
+ * Store a collection of Map.Entry's in conf, with each entry separated by ','
+ * and key values delimited by {@link #KVP_DELIMITER}
+ *
+ * @param conf configuration to store the collection in
+ * @param key overall key to store keyValues under
+ * @param keyValues kvps to be stored under key in conf
+ */
+ public static void setKeyValues(Configuration conf, String key,
+ Collection> keyValues) {
+ setKeyValues(conf, key, keyValues, KVP_DELIMITER);
+ }
+
+ /**
+ * Store a collection of Map.Entry's in conf, with each entry separated by ','
+ * and key values delimited by delimiter.
+ *
+ * @param conf configuration to store the collection in
+ * @param key overall key to store keyValues under
+ * @param keyValues kvps to be stored under key in conf
+ * @param delimiter character used to separate each kvp
+ */
+ public static void setKeyValues(Configuration conf, String key,
+ Collection> keyValues, char delimiter) {
+ List serializedKvps = Lists.newArrayList();
+
+ for (Map.Entry kvp : keyValues) {
+ serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue());
+ }
+
+ conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()]));
+ }
+
+ /**
+ * Retrieve a list of key value pairs from configuration, stored under the provided key
+ *
+ * @param conf configuration to retrieve kvps from
+ * @param key key under which the key values are stored
+ * @return the list of kvps stored under key in conf, or null if the key isn't present.
+ * @see #setKeyValues(Configuration, String, Collection, char)
+ */
+ public static List> getKeyValues(Configuration conf, String key) {
+ return getKeyValues(conf, key, KVP_DELIMITER);
+ }
+
+ /**
+ * Retrieve a list of key value pairs from configuration, stored under the provided key
+ *
+ * @param conf configuration to retrieve kvps from
+ * @param key key under which the key values are stored
+ * @param delimiter character used to separate each kvp
+ * @return the list of kvps stored under key in conf, or null if the key isn't present.
+ * @see #setKeyValues(Configuration, String, Collection, char)
+ */
+ public static List> getKeyValues(Configuration conf, String key,
+ char delimiter) {
+ String[] kvps = conf.getStrings(key);
+
+ if (kvps == null) {
+ return null;
+ }
+
+ List> rtn = Lists.newArrayList();
+
+ for (String kvp : kvps) {
+ String[] splitKvp = StringUtils.split(kvp, delimiter);
+
+ if (splitKvp.length != 2) {
+ throw new IllegalArgumentException(
+ "Expected key value pair for configuration key '" + key + "'" + " to be of form '"
+ + delimiter + "; was " + kvp + " instead");
+ }
+
+ rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1]));
+ }
+ return rtn;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 00000000000..915a35d2387
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat
+ extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
+
+ @Override
+ protected void runJob(String jobName, Configuration c, List scans)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ JobConf job = new JobConf(TEST_UTIL.getConfiguration());
+
+ job.setJobName(jobName);
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+
+ TableMapReduceUtil.addDependencyJars(job);
+
+ job.setReducerClass(Reducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+
+ RunningJob runningJob = JobClient.runJob(job);
+ runningJob.waitForCompletion();
+ assertTrue(runningJob.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+ public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
+ implements TableMap {
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector outputCollector,
+ Reporter reporter) throws IOException {
+ makeAssertions(key, value);
+ outputCollector.collect(key, key);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+
+ }
+ }
+
+ public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
+ org.apache.hadoop.mapred.Reducer {
+
+ private JobConf jobConf;
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator values,
+ OutputCollector outputCollector, Reporter reporter)
+ throws IOException {
+ makeAssertions(key, Lists.newArrayList(values));
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ super.cleanup(this.jobConf);
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
new file mode 100644
index 00000000000..6129b265337
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base set of tests and setup for input formats touching multiple tables.
+ */
+public abstract class MultiTableInputFormatTestBase {
+ static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
+ public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ static final String TABLE_NAME = "scantest";
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ static List TABLES = Lists.newArrayList();
+
+ static {
+ for (int i = 0; i < 3; i++) {
+ TABLES.add(TABLE_NAME + String.valueOf(i));
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ for (String tableName : TABLES) {
+ try (HTable table =
+ TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName),
+ INPUT_FAMILY, 4)) {
+ TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
+ }
+ }
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Configuration c = TEST_UTIL.getConfiguration();
+ FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+ }
+
+ /**
+ * Pass the key and value to reducer.
+ */
+ public static class ScanMapper extends
+ TableMapper {
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ makeAssertions(key, value);
+ context.write(key, key);
+ }
+
+ public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map>> cf =
+ value.getMap();
+ if (!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+ LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> " + val);
+ }
+ }
+
+ /**
+ * Checks the last and first keys seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends
+ Reducer {
+ private String first = null;
+ private String last = null;
+
+ @Override
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable values, Context context)
+ throws IOException, InterruptedException {
+ makeAssertions(key, values);
+ }
+
+ protected void makeAssertions(ImmutableBytesWritable key,
+ Iterable values) {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.debug("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ assertEquals(3, count);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ Configuration c = context.getConfiguration();
+ cleanup(c);
+ }
+
+ protected void cleanup(Configuration c) {
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
+ startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
+ "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+ }
+
+ @Test
+ public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ @Test
+ public void testScanEmptyToAPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ @Test
+ public void testScanOBBToOPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ @Test
+ public void testScanYZYToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ private void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName =
+ "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
+ (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ List scans = new ArrayList();
+
+ for (String tableName : TABLES) {
+ Scan scan = new Scan();
+
+ scan.addFamily(INPUT_FAMILY);
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
+
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+
+ scans.add(scan);
+
+ LOG.info("scan before: " + scan);
+ }
+
+ runJob(jobName, c, scans);
+ }
+
+ protected void runJob(String jobName, Configuration c, List scans)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = new Job(c, jobName);
+
+ initJob(scans, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ job.waitForCompletion(true);
+ assertTrue(job.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+ protected abstract void initJob(List scans, Job job) throws IOException;
+
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
index f2827acfa35..1bfd14a280b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
@@ -17,236 +17,34 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
+import java.util.List;
+
/**
* Tests various scan start and stop row scenarios. This is set in a scan and
* tested in a MapReduce job to see if that is handed over and done properly
* too.
*/
@Category({VerySlowMapReduceTests.class, LargeTests.class})
-public class TestMultiTableInputFormat {
-
- private static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
- static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- static final String TABLE_NAME = "scantest";
- static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
- static final String KEY_STARTROW = "startRow";
- static final String KEY_LASTROW = "stpRow";
+public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase {
@BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // switch TIF to log at DEBUG level
+ public static void setupLogging() {
TEST_UTIL.enableDebug(MultiTableInputFormat.class);
- TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
- // start mini hbase cluster
- TEST_UTIL.startMiniCluster(3);
- // create and fill table
- for (int i = 0; i < 3; i++) {
- try (HTable table =
- TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
- INPUT_FAMILY, 4)) {
- TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
- }
- }
- // start MR cluster
- TEST_UTIL.startMiniMapReduceCluster();
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniMapReduceCluster();
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @After
- public void tearDown() throws Exception {
- Configuration c = TEST_UTIL.getConfiguration();
- FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
- }
-
- /**
- * Pass the key and value to reducer.
- */
- public static class ScanMapper extends
- TableMapper {
- /**
- * Pass the key and value to reduce.
- *
- * @param key The key, here "aaa", "aab" etc.
- * @param value The value is the same as the key.
- * @param context The task context.
- * @throws IOException When reading the rows fails.
- */
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- if (value.size() != 1) {
- throw new IOException("There should only be one input column");
- }
- Map>> cf =
- value.getMap();
- if (!cf.containsKey(INPUT_FAMILY)) {
- throw new IOException("Wrong input columns. Missing: '" +
- Bytes.toString(INPUT_FAMILY) + "'.");
- }
- String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
- LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
- ", value -> " + val);
- context.write(key, key);
- }
- }
-
- /**
- * Checks the last and first keys seen against the scanner boundaries.
- */
- public static class ScanReducer
- extends
- Reducer {
- private String first = null;
- private String last = null;
-
- @Override
- protected void reduce(ImmutableBytesWritable key,
- Iterable values, Context context)
- throws IOException, InterruptedException {
- int count = 0;
- for (ImmutableBytesWritable value : values) {
- String val = Bytes.toStringBinary(value.get());
- LOG.debug("reduce: key[" + count + "] -> " +
- Bytes.toStringBinary(key.get()) + ", value -> " + val);
- if (first == null) first = val;
- last = val;
- count++;
- }
- assertEquals(3, count);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException {
- Configuration c = context.getConfiguration();
- String startRow = c.get(KEY_STARTROW);
- String lastRow = c.get(KEY_LASTROW);
- LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
- startRow + "\"");
- LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
- "\"");
- if (startRow != null && startRow.length() > 0) {
- assertEquals(startRow, first);
- }
- if (lastRow != null && lastRow.length() > 0) {
- assertEquals(lastRow, last);
- }
- }
- }
-
- @Test
- public void testScanEmptyToEmpty() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan(null, null, null);
- }
-
- @Test
- public void testScanEmptyToAPP() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan(null, "app", "apo");
- }
-
- @Test
- public void testScanOBBToOPP() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan("obb", "opp", "opo");
- }
-
- @Test
- public void testScanYZYToEmpty() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan("yzy", null, "zzz");
- }
-
- /**
- * Tests a MR scan using specific start and stop rows.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- private void testScan(String start, String stop, String last)
- throws IOException, InterruptedException, ClassNotFoundException {
- String jobName =
- "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
- (stop != null ? stop.toUpperCase() : "Empty");
- LOG.info("Before map/reduce startup - job " + jobName);
- Configuration c = new Configuration(TEST_UTIL.getConfiguration());
-
- c.set(KEY_STARTROW, start != null ? start : "");
- c.set(KEY_LASTROW, last != null ? last : "");
-
- List scans = new ArrayList();
-
- for(int i=0; i<3; i++){
- Scan scan = new Scan();
-
- scan.addFamily(INPUT_FAMILY);
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
-
- if (start != null) {
- scan.setStartRow(Bytes.toBytes(start));
- }
- if (stop != null) {
- scan.setStopRow(Bytes.toBytes(stop));
- }
-
- scans.add(scan);
-
- LOG.info("scan before: " + scan);
- }
-
- Job job = new Job(c, jobName);
-
+ @Override
+ protected void initJob(List scans, Job job) throws IOException {
TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
- job.setReducerClass(ScanReducer.class);
- job.setNumReduceTasks(1); // one to get final "first" and "last" key
- FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
- LOG.info("Started " + job.getJobName());
- job.waitForCompletion(true);
- assertTrue(job.isSuccessful());
- LOG.info("After map/reduce completion - job " + jobName);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 00000000000..6285ca18997
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimaps;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
+
+ protected Path restoreDir;
+
+ @BeforeClass
+ public static void setUpSnapshots() throws Exception {
+
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
+
+ // take a snapshot of every table we have.
+ for (String tableName : TABLES) {
+ SnapshotTestingUtils
+ .createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName),
+ ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null,
+ snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
+ TEST_UTIL.getTestFileSystem(), true);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.restoreDir = new Path("/tmp");
+
+ }
+
+ @Override
+ protected void initJob(List scans, Job job) throws IOException {
+ TableMapReduceUtil
+ .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+ }
+
+ protected Map> getSnapshotScanMapping(final List scans) {
+ return Multimaps.index(scans, new Function() {
+ @Nullable
+ @Override
+ public String apply(Scan input) {
+ return snapshotNameForTable(
+ Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
+ }
+ }).asMap();
+ }
+
+ public static String snapshotNameForTable(String tableName) {
+ return tableName + "_snapshot";
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 00000000000..b4b8056fe60
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+
+@Category({ SmallTests.class })
+public class TestMultiTableSnapshotInputFormatImpl {
+
+ private MultiTableSnapshotInputFormatImpl subject;
+ private Map> snapshotScans;
+ private Path restoreDir;
+ private Configuration conf;
+ private Path rootDir;
+
+ @Before
+ public void setUp() throws Exception {
+ this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
+
+ // mock out restoreSnapshot
+ // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper
+ // dependency into the
+ // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current
+ // design, and it *also*
+ // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would
+ // probably be the more "pure"
+ // way of doing things. This is the lesser of two evils, perhaps?
+ doNothing().when(this.subject).
+ restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class),
+ any(Path.class), any(FileSystem.class));
+
+ this.conf = new Configuration();
+ this.rootDir = new Path("file:///test-root-dir");
+ FSUtils.setRootDir(conf, rootDir);
+ this.snapshotScans = ImmutableMap.>of("snapshot1",
+ ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2",
+ ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
+ new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))));
+
+ this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
+
+ }
+
+ public void callSetInput() throws IOException {
+ subject.setInput(this.conf, snapshotScans, restoreDir);
+ }
+
+ public Map> toScanWithEquals(
+ Map> snapshotScans) throws IOException {
+ Map> rtn = Maps.newHashMap();
+
+ for (Map.Entry> entry : snapshotScans.entrySet()) {
+ List scans = Lists.newArrayList();
+
+ for (Scan scan : entry.getValue()) {
+ scans.add(new ScanWithEquals(scan));
+ }
+ rtn.put(entry.getKey(), scans);
+ }
+
+ return rtn;
+ }
+
+ public static class ScanWithEquals {
+
+ private final String startRow;
+ private final String stopRow;
+
+ /**
+ * Creates a new instance of this class while copying all values.
+ *
+ * @param scan The scan instance to copy from.
+ * @throws java.io.IOException When copying the values fails.
+ */
+ public ScanWithEquals(Scan scan) throws IOException {
+ this.startRow = Bytes.toStringBinary(scan.getStartRow());
+ this.stopRow = Bytes.toStringBinary(scan.getStopRow());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ScanWithEquals)) {
+ return false;
+ }
+ ScanWithEquals otherScan = (ScanWithEquals) obj;
+ return Objects.equals(this.startRow, otherScan.startRow) && Objects
+ .equals(this.stopRow, otherScan.stopRow);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow)
+ .add("stopRow", stopRow).toString();
+ }
+ }
+
+ @Test
+ public void testSetInputSetsSnapshotToScans() throws Exception {
+
+ callSetInput();
+
+ Map> actual = subject.getSnapshotsToScans(conf);
+
+ // convert to scans we can use .equals on
+ Map> actualWithEquals = toScanWithEquals(actual);
+ Map> expectedWithEquals = toScanWithEquals(snapshotScans);
+
+ assertEquals(expectedWithEquals, actualWithEquals);
+ }
+
+ @Test
+ public void testSetInputPushesRestoreDirectories() throws Exception {
+ callSetInput();
+
+ Map restoreDirs = subject.getSnapshotDirs(conf);
+
+ assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
+ }
+
+ @Test
+ public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
+ callSetInput();
+
+ Map restoreDirs = subject.getSnapshotDirs(conf);
+
+ for (Path snapshotDir : restoreDirs.values()) {
+ assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir,
+ snapshotDir.getParent());
+ }
+ }
+
+ @Test
+ public void testSetInputRestoresSnapshots() throws Exception {
+ callSetInput();
+
+ Map snapshotDirs = subject.getSnapshotDirs(conf);
+
+ for (Map.Entry entry : snapshotDirs.entrySet()) {
+ verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir),
+ eq(entry.getValue()), any(FileSystem.class));
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java
new file mode 100644
index 00000000000..a9ecf9ed0ee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@Category({ SmallTests.class })
+public class TestConfigurationUtil {
+
+ private Configuration conf;
+ private Map keyValues;
+ private String key;
+
+ @Before
+ public void setUp() throws Exception {
+ this.conf = new Configuration();
+ this.keyValues = ImmutableMap.of("k1", "v1", "k2", "v2");
+ this.key = "my_conf_key";
+ }
+
+ public void callSetKeyValues() {
+ ConfigurationUtil.setKeyValues(conf, key, keyValues.entrySet());
+ }
+
+ public List> callGetKeyValues() {
+ return ConfigurationUtil.getKeyValues(conf, key);
+ }
+
+ @Test
+ public void testGetAndSetKeyValuesWithValues() throws Exception {
+ callSetKeyValues();
+ assertEquals(Lists.newArrayList(this.keyValues.entrySet()), callGetKeyValues());
+ }
+
+ @Test
+ public void testGetKeyValuesWithUnsetKey() throws Exception {
+ assertNull(callGetKeyValues());
+ }
+
+}