HBASE-13356 HBase should provide an InputFormat supporting multiple scans in mapreduce jobs over snapshots (Andrew Mains)
This commit is contained in:
parent
de01553bc4
commit
722fd17069
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
|
||||||
|
import org.apache.hadoop.mapred.InputFormat;
|
||||||
|
import org.apache.hadoop.mapred.InputSplit;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
|
||||||
|
* .TableSnapshotInputFormat}
|
||||||
|
* allowing a MapReduce job to run over one or more table snapshots, with one or more scans
|
||||||
|
* configured for each.
|
||||||
|
* Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .TableSnapshotInputFormat}
|
||||||
|
* and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .TableSnapshotInputFormat} for
|
||||||
|
* more details.
|
||||||
|
* Usage is similar to TableSnapshotInputFormat, with the following exception:
|
||||||
|
* initMultiTableSnapshotMapperJob takes in a map
|
||||||
|
* from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
|
||||||
|
* scan will be applied;
|
||||||
|
* the overall dataset for the job is defined by the concatenation of the regions and tables
|
||||||
|
* included in each snapshot/scan
|
||||||
|
* pair.
|
||||||
|
* {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
|
||||||
|
* Class, Class, Class, JobConf, boolean, Path)}
|
||||||
|
* can be used to configure the job.
|
||||||
|
* <pre>{@code
|
||||||
|
* Job job = new Job(conf);
|
||||||
|
* Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
|
||||||
|
* "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
|
||||||
|
* "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
|
||||||
|
* );
|
||||||
|
* Path restoreDir = new Path("/tmp/snapshot_restore_dir")
|
||||||
|
* TableMapReduceUtil.initTableSnapshotMapperJob(
|
||||||
|
* snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
|
||||||
|
* MyMapOutputValueWritable.class, job, true, restoreDir);
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
* Internally, this input format restores each snapshot into a subdirectory of the given tmp
|
||||||
|
* directory. Input splits and
|
||||||
|
* record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .TableSnapshotInputFormat}
|
||||||
|
* (one per region).
|
||||||
|
* See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
|
||||||
|
* permissioning; the
|
||||||
|
* same caveats apply here.
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
|
||||||
|
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
|
||||||
|
implements InputFormat<ImmutableBytesWritable, Result> {
|
||||||
|
|
||||||
|
private final MultiTableSnapshotInputFormatImpl delegate;
|
||||||
|
|
||||||
|
public MultiTableSnapshotInputFormat() {
|
||||||
|
this.delegate = new MultiTableSnapshotInputFormatImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
|
||||||
|
List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
|
||||||
|
InputSplit[] results = new InputSplit[splits.size()];
|
||||||
|
for (int i = 0; i < splits.size(); i++) {
|
||||||
|
results[i] = new TableSnapshotRegionSplit(splits.get(i));
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
|
||||||
|
* restoreDir.
|
||||||
|
* Sets: {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
|
||||||
|
* {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param snapshotScans
|
||||||
|
* @param restoreDir
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
|
||||||
|
Path restoreDir) throws IOException {
|
||||||
|
new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -18,37 +18,32 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapred;
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
|
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
|
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.token.TokenUtil;
|
import org.apache.hadoop.hbase.security.token.TokenUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.FileInputFormat;
|
import org.apache.hadoop.mapred.FileInputFormat;
|
||||||
import org.apache.hadoop.mapred.InputFormat;
|
import org.apache.hadoop.mapred.InputFormat;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.OutputFormat;
|
import org.apache.hadoop.mapred.OutputFormat;
|
||||||
import org.apache.hadoop.mapred.TextInputFormat;
|
import org.apache.hadoop.mapred.TextInputFormat;
|
||||||
import org.apache.hadoop.mapred.TextOutputFormat;
|
import org.apache.hadoop.mapred.TextOutputFormat;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility for {@link TableMap} and {@link TableReduce}
|
* Utility for {@link TableMap} and {@link TableReduce}
|
||||||
@ -127,6 +122,40 @@ public class TableMapReduceUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up the job for reading from one or more multiple table snapshots, with one or more scans
|
||||||
|
* per snapshot.
|
||||||
|
* It bypasses hbase servers and read directly from snapshot files.
|
||||||
|
*
|
||||||
|
* @param snapshotScans map of snapshot name to scans on that snapshot.
|
||||||
|
* @param mapper The mapper class to use.
|
||||||
|
* @param outputKeyClass The class of the output key.
|
||||||
|
* @param outputValueClass The class of the output value.
|
||||||
|
* @param job The current job to adjust. Make sure the passed job is
|
||||||
|
* carrying all necessary HBase configuration.
|
||||||
|
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
||||||
|
* job classes via the distributed cache (tmpjars).
|
||||||
|
*/
|
||||||
|
public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
|
||||||
|
Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
|
||||||
|
JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
|
||||||
|
MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
|
||||||
|
|
||||||
|
job.setInputFormat(MultiTableSnapshotInputFormat.class);
|
||||||
|
if (outputValueClass != null) {
|
||||||
|
job.setMapOutputValueClass(outputValueClass);
|
||||||
|
}
|
||||||
|
if (outputKeyClass != null) {
|
||||||
|
job.setMapOutputKeyClass(outputKeyClass);
|
||||||
|
}
|
||||||
|
job.setMapperClass(mapper);
|
||||||
|
if (addDependencyJars) {
|
||||||
|
addDependencyJars(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
||||||
* and read directly from snapshot files.
|
* and read directly from snapshot files.
|
||||||
|
@ -18,12 +18,13 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hbase.mapred;
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
|
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
|
||||||
import org.apache.hadoop.mapred.InputFormat;
|
import org.apache.hadoop.mapred.InputFormat;
|
||||||
@ -60,8 +61,9 @@ public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWrita
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
||||||
List<String> locations) {
|
List<String> locations, Scan scan, Path restoreDir) {
|
||||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
|
this.delegate =
|
||||||
|
new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,108 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MultiTableSnapshotInputFormat generalizes
|
||||||
|
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
|
||||||
|
* allowing a MapReduce job to run over one or more table snapshots, with one or more scans
|
||||||
|
* configured for each.
|
||||||
|
* Internally, the input format delegates to
|
||||||
|
* {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
|
||||||
|
* and thus has the same performance advantages;
|
||||||
|
* see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
|
||||||
|
* more details.
|
||||||
|
* Usage is similar to TableSnapshotInputFormat, with the following exception:
|
||||||
|
* initMultiTableSnapshotMapperJob takes in a map
|
||||||
|
* from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
|
||||||
|
* scan will be applied;
|
||||||
|
* the overall dataset for the job is defined by the concatenation of the regions and tables
|
||||||
|
* included in each snapshot/scan
|
||||||
|
* pair.
|
||||||
|
* {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
|
||||||
|
* (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
|
||||||
|
* .hadoop.fs.Path)}
|
||||||
|
* can be used to configure the job.
|
||||||
|
* <pre>{@code
|
||||||
|
* Job job = new Job(conf);
|
||||||
|
* Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
|
||||||
|
* "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
|
||||||
|
* "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
|
||||||
|
* );
|
||||||
|
* Path restoreDir = new Path("/tmp/snapshot_restore_dir")
|
||||||
|
* TableMapReduceUtil.initTableSnapshotMapperJob(
|
||||||
|
* snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
|
||||||
|
* MyMapOutputValueWritable.class, job, true, restoreDir);
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
* Internally, this input format restores each snapshot into a subdirectory of the given tmp
|
||||||
|
* directory. Input splits and
|
||||||
|
* record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .TableSnapshotInputFormat}
|
||||||
|
* (one per region).
|
||||||
|
* See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
|
||||||
|
* permissioning; the
|
||||||
|
* same caveats apply here.
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
|
||||||
|
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
|
||||||
|
|
||||||
|
private final MultiTableSnapshotInputFormatImpl delegate;
|
||||||
|
|
||||||
|
public MultiTableSnapshotInputFormat() {
|
||||||
|
this.delegate = new MultiTableSnapshotInputFormatImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<InputSplit> getSplits(JobContext jobContext)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
List<TableSnapshotInputFormatImpl.InputSplit> splits =
|
||||||
|
delegate.getSplits(jobContext.getConfiguration());
|
||||||
|
List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
|
||||||
|
|
||||||
|
for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
|
||||||
|
rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setInput(Configuration configuration,
|
||||||
|
Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
|
||||||
|
new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,252 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||||
|
import org.apache.hadoop.hbase.util.ConfigurationUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shared implementation of mapreduce code over multiple table snapshots.
|
||||||
|
* Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
|
||||||
|
* .MultiTableSnapshotInputFormat} and mapred
|
||||||
|
* ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({ "HBase" })
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class MultiTableSnapshotInputFormatImpl {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
|
||||||
|
|
||||||
|
public static final String RESTORE_DIRS_KEY =
|
||||||
|
"hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
|
||||||
|
public static final String SNAPSHOT_TO_SCANS_KEY =
|
||||||
|
"hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
|
||||||
|
* restoreDir.
|
||||||
|
* Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param snapshotScans
|
||||||
|
* @param restoreDir
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
|
||||||
|
Path restoreDir) throws IOException {
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
|
FileSystem fs = rootDir.getFileSystem(conf);
|
||||||
|
|
||||||
|
setSnapshotToScans(conf, snapshotScans);
|
||||||
|
Map<String, Path> restoreDirs =
|
||||||
|
generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
|
||||||
|
setSnapshotDirs(conf, restoreDirs);
|
||||||
|
restoreSnapshots(conf, restoreDirs, fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the list of splits extracted from the scans/snapshots pushed to conf by
|
||||||
|
* {@link
|
||||||
|
* #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
|
||||||
|
*
|
||||||
|
* @param conf Configuration to determine splits from
|
||||||
|
* @return Return the list of splits extracted from the scans/snapshots pushed to conf
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
|
FileSystem fs = rootDir.getFileSystem(conf);
|
||||||
|
|
||||||
|
List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
|
||||||
|
|
||||||
|
Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
|
||||||
|
Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
|
||||||
|
for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
|
||||||
|
String snapshotName = entry.getKey();
|
||||||
|
|
||||||
|
Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
|
||||||
|
|
||||||
|
SnapshotManifest manifest =
|
||||||
|
TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
|
||||||
|
List<HRegionInfo> regionInfos =
|
||||||
|
TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
|
||||||
|
|
||||||
|
for (Scan scan : entry.getValue()) {
|
||||||
|
List<TableSnapshotInputFormatImpl.InputSplit> splits =
|
||||||
|
TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
|
||||||
|
rtn.addAll(splits);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the snapshot name -> list<scan> mapping pushed to configuration by
|
||||||
|
* {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
|
||||||
|
*
|
||||||
|
* @param conf Configuration to extract name -> list<scan> mappings from.
|
||||||
|
* @return the snapshot name -> list<scan> mapping pushed to configuration
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
|
||||||
|
|
||||||
|
Map<String, Collection<Scan>> rtn = Maps.newHashMap();
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : ConfigurationUtil
|
||||||
|
.getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
|
||||||
|
String snapshotName = entry.getKey();
|
||||||
|
String scan = entry.getValue();
|
||||||
|
|
||||||
|
Collection<Scan> snapshotScans = rtn.get(snapshotName);
|
||||||
|
if (snapshotScans == null) {
|
||||||
|
snapshotScans = Lists.newArrayList();
|
||||||
|
rtn.put(snapshotName, snapshotScans);
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param snapshotScans
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
|
||||||
|
throws IOException {
|
||||||
|
// flatten out snapshotScans for serialization to the job conf
|
||||||
|
List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
|
||||||
|
String snapshotName = entry.getKey();
|
||||||
|
Collection<Scan> scans = entry.getValue();
|
||||||
|
|
||||||
|
// serialize all scans and map them to the appropriate snapshot
|
||||||
|
for (Scan scan : scans) {
|
||||||
|
snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
|
||||||
|
TableMapReduceUtil.convertScanToString(scan)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the directories into which snapshots have been restored from
|
||||||
|
* ({@link #RESTORE_DIRS_KEY})
|
||||||
|
*
|
||||||
|
* @param conf Configuration to extract restore directories from
|
||||||
|
* @return the directories into which snapshots have been restored from
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
|
||||||
|
List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
|
||||||
|
Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> kvp : kvps) {
|
||||||
|
rtn.put(kvp.getKey(), new Path(kvp.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
|
||||||
|
Map<String, String> toSet = Maps.newHashMap();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
|
||||||
|
toSet.put(entry.getKey(), entry.getValue().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
|
||||||
|
* return a map from the snapshot to the restore directory.
|
||||||
|
*
|
||||||
|
* @param snapshots collection of snapshot names to restore
|
||||||
|
* @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
|
||||||
|
* @return a mapping from snapshot name to the directory in which that snapshot has been restored
|
||||||
|
*/
|
||||||
|
private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
|
||||||
|
Path baseRestoreDir) {
|
||||||
|
Map<String, Path> rtn = Maps.newHashMap();
|
||||||
|
|
||||||
|
for (String snapshotName : snapshots) {
|
||||||
|
Path restoreSnapshotDir =
|
||||||
|
new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
|
||||||
|
rtn.put(snapshotName, restoreSnapshotDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restore each (snapshot name, restore directory) pair in snapshotToDir
|
||||||
|
*
|
||||||
|
* @param conf configuration to restore with
|
||||||
|
* @param snapshotToDir mapping from snapshot names to restore directories
|
||||||
|
* @param fs filesystem to do snapshot restoration on
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
|
||||||
|
throws IOException {
|
||||||
|
// TODO: restore from record readers to parallelize.
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
|
|
||||||
|
for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
|
||||||
|
String snapshotName = entry.getKey();
|
||||||
|
Path restoreDir = entry.getValue();
|
||||||
|
LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
|
||||||
|
+ " for MultiTableSnapshotInputFormat");
|
||||||
|
restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
|
||||||
|
FileSystem fs) throws IOException {
|
||||||
|
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -18,20 +18,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.File;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import java.io.IOException;
|
import com.yammer.metrics.core.MetricsRegistry;
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLDecoder;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Enumeration;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.zip.ZipEntry;
|
|
||||||
import java.util.zip.ZipFile;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -62,6 +50,21 @@ import org.apache.hadoop.mapreduce.Job;
|
|||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.URLDecoder;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.zip.ZipEntry;
|
||||||
|
import java.util.zip.ZipFile;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility for {@link TableMapper} and {@link TableReducer}
|
* Utility for {@link TableMapper} and {@link TableReducer}
|
||||||
*/
|
*/
|
||||||
@ -305,6 +308,44 @@ public class TableMapReduceUtil {
|
|||||||
conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
|
conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up the job for reading from one or more table snapshots, with one or more scans
|
||||||
|
* per snapshot.
|
||||||
|
* It bypasses hbase servers and read directly from snapshot files.
|
||||||
|
*
|
||||||
|
* @param snapshotScans map of snapshot name to scans on that snapshot.
|
||||||
|
* @param mapper The mapper class to use.
|
||||||
|
* @param outputKeyClass The class of the output key.
|
||||||
|
* @param outputValueClass The class of the output value.
|
||||||
|
* @param job The current job to adjust. Make sure the passed job is
|
||||||
|
* carrying all necessary HBase configuration.
|
||||||
|
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
||||||
|
* job classes via the distributed cache (tmpjars).
|
||||||
|
*/
|
||||||
|
public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
|
||||||
|
Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
|
||||||
|
Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
|
||||||
|
MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
|
||||||
|
|
||||||
|
job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
|
||||||
|
if (outputValueClass != null) {
|
||||||
|
job.setMapOutputValueClass(outputValueClass);
|
||||||
|
}
|
||||||
|
if (outputKeyClass != null) {
|
||||||
|
job.setMapOutputKeyClass(outputKeyClass);
|
||||||
|
}
|
||||||
|
job.setMapperClass(mapper);
|
||||||
|
Configuration conf = job.getConfiguration();
|
||||||
|
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
|
||||||
|
|
||||||
|
if (addDependencyJars) {
|
||||||
|
addDependencyJars(job);
|
||||||
|
addDependencyJars(job.getConfiguration(), MetricsRegistry.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
resetCacheConfig(job.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
||||||
* and read directly from snapshot files.
|
* and read directly from snapshot files.
|
||||||
|
@ -18,19 +18,14 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
@ -41,7 +36,12 @@ import org.apache.hadoop.mapreduce.JobContext;
|
|||||||
import org.apache.hadoop.mapreduce.RecordReader;
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
|
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
|
||||||
@ -98,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
|
||||||
List<String> locations) {
|
List<String> locations, Scan scan, Path restoreDir) {
|
||||||
this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
|
this.delegate =
|
||||||
|
new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,8 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
|
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
|
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
|
||||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
@ -61,9 +64,11 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
// TODO: Snapshots files are owned in fs by the hbase user. There is no
|
// TODO: Snapshots files are owned in fs by the hbase user. There is no
|
||||||
// easy way to delegate access.
|
// easy way to delegate access.
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
|
||||||
|
|
||||||
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
|
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
|
||||||
// key for specifying the root dir of the restored snapshot
|
// key for specifying the root dir of the restored snapshot
|
||||||
private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
|
protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
|
||||||
|
|
||||||
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
|
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
|
||||||
private static final String LOCALITY_CUTOFF_MULTIPLIER =
|
private static final String LOCALITY_CUTOFF_MULTIPLIER =
|
||||||
@ -74,14 +79,18 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
||||||
*/
|
*/
|
||||||
public static class InputSplit implements Writable {
|
public static class InputSplit implements Writable {
|
||||||
|
|
||||||
private HTableDescriptor htd;
|
private HTableDescriptor htd;
|
||||||
private HRegionInfo regionInfo;
|
private HRegionInfo regionInfo;
|
||||||
private String[] locations;
|
private String[] locations;
|
||||||
|
private String scan;
|
||||||
|
private String restoreDir;
|
||||||
|
|
||||||
// constructor for mapreduce framework / Writable
|
// constructor for mapreduce framework / Writable
|
||||||
public InputSplit() {}
|
public InputSplit() {}
|
||||||
|
|
||||||
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
|
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
|
||||||
|
Scan scan, Path restoreDir) {
|
||||||
this.htd = htd;
|
this.htd = htd;
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
if (locations == null || locations.isEmpty()) {
|
if (locations == null || locations.isEmpty()) {
|
||||||
@ -89,6 +98,25 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
} else {
|
} else {
|
||||||
this.locations = locations.toArray(new String[locations.size()]);
|
this.locations = locations.toArray(new String[locations.size()]);
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to convert Scan to String", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.restoreDir = restoreDir.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor getHtd() {
|
||||||
|
return htd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getScan() {
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRestoreDir() {
|
||||||
|
return restoreDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLength() {
|
public long getLength() {
|
||||||
@ -128,6 +156,10 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
byte[] buf = baos.toByteArray();
|
byte[] buf = baos.toByteArray();
|
||||||
out.writeInt(buf.length);
|
out.writeInt(buf.length);
|
||||||
out.write(buf);
|
out.write(buf);
|
||||||
|
|
||||||
|
Bytes.writeByteArray(out, Bytes.toBytes(scan));
|
||||||
|
Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -140,6 +172,9 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
this.regionInfo = HRegionInfo.convert(split.getRegion());
|
this.regionInfo = HRegionInfo.convert(split.getRegion());
|
||||||
List<String> locationsList = split.getLocationsList();
|
List<String> locationsList = split.getLocationsList();
|
||||||
this.locations = locationsList.toArray(new String[locationsList.size()]);
|
this.locations = locationsList.toArray(new String[locationsList.size()]);
|
||||||
|
|
||||||
|
this.scan = Bytes.toString(Bytes.readByteArray(in));
|
||||||
|
this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,28 +193,12 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void initialize(InputSplit split, Configuration conf) throws IOException {
|
public void initialize(InputSplit split, Configuration conf) throws IOException {
|
||||||
|
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
|
||||||
this.split = split;
|
this.split = split;
|
||||||
HTableDescriptor htd = split.htd;
|
HTableDescriptor htd = split.htd;
|
||||||
HRegionInfo hri = this.split.getRegionInfo();
|
HRegionInfo hri = this.split.getRegionInfo();
|
||||||
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||||
|
|
||||||
Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
|
|
||||||
// directory where snapshot was restored
|
|
||||||
|
|
||||||
// create scan
|
|
||||||
// TODO: mapred does not support scan as input API. Work around for now.
|
|
||||||
if (conf.get(TableInputFormat.SCAN) != null) {
|
|
||||||
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
|
|
||||||
} else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
|
|
||||||
String[] columns =
|
|
||||||
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
|
|
||||||
scan = new Scan();
|
|
||||||
for (String col : columns) {
|
|
||||||
scan.addFamily(Bytes.toBytes(col));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("A Scan is not configured for this job");
|
|
||||||
}
|
|
||||||
|
|
||||||
// region is immutable, this should be fine,
|
// region is immutable, this should be fine,
|
||||||
// otherwise we have to set the thread read point
|
// otherwise we have to set the thread read point
|
||||||
@ -187,7 +206,8 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
// disable caching of data blocks
|
// disable caching of data blocks
|
||||||
scan.setCacheBlocks(false);
|
scan.setCacheBlocks(false);
|
||||||
|
|
||||||
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
|
scanner =
|
||||||
|
new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean nextKeyValue() throws IOException {
|
public boolean nextKeyValue() throws IOException {
|
||||||
@ -233,18 +253,40 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
Path rootDir = FSUtils.getRootDir(conf);
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
FileSystem fs = rootDir.getFileSystem(conf);
|
FileSystem fs = rootDir.getFileSystem(conf);
|
||||||
|
|
||||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
|
||||||
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
|
||||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
|
List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
|
||||||
|
|
||||||
|
// TODO: mapred does not support scan as input API. Work around for now.
|
||||||
|
Scan scan = extractScanFromConf(conf);
|
||||||
|
// the temp dir where the snapshot is restored
|
||||||
|
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
|
||||||
|
|
||||||
|
return getSplits(scan, manifest, regionInfos, restoreDir, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
|
||||||
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
||||||
if (regionManifests == null) {
|
if (regionManifests == null) {
|
||||||
throw new IllegalArgumentException("Snapshot seems empty");
|
throw new IllegalArgumentException("Snapshot seems empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
// load table descriptor
|
List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
|
||||||
HTableDescriptor htd = manifest.getTableDescriptor();
|
|
||||||
|
|
||||||
// TODO: mapred does not support scan as input API. Work around for now.
|
for (SnapshotRegionManifest regionManifest : regionManifests) {
|
||||||
|
regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
|
||||||
|
}
|
||||||
|
return regionInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
|
||||||
|
Path rootDir, FileSystem fs) throws IOException {
|
||||||
|
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||||
|
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||||
|
return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Scan extractScanFromConf(Configuration conf) throws IOException {
|
||||||
Scan scan = null;
|
Scan scan = null;
|
||||||
if (conf.get(TableInputFormat.SCAN) != null) {
|
if (conf.get(TableInputFormat.SCAN) != null) {
|
||||||
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
|
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
|
||||||
@ -258,29 +300,35 @@ public class TableSnapshotInputFormatImpl {
|
|||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unable to create scan");
|
throw new IllegalArgumentException("Unable to create scan");
|
||||||
}
|
}
|
||||||
// the temp dir where the snapshot is restored
|
return scan;
|
||||||
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
|
}
|
||||||
|
|
||||||
|
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
|
||||||
|
List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
|
||||||
|
// load table descriptor
|
||||||
|
HTableDescriptor htd = manifest.getTableDescriptor();
|
||||||
|
|
||||||
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
|
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
|
||||||
|
|
||||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||||
for (SnapshotRegionManifest regionManifest : regionManifests) {
|
for (HRegionInfo hri : regionManifests) {
|
||||||
// load region descriptor
|
// load region descriptor
|
||||||
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
|
|
||||||
|
|
||||||
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
|
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
|
||||||
hri.getStartKey(), hri.getEndKey())) {
|
hri.getEndKey())) {
|
||||||
// compute HDFS locations from snapshot files (which will get the locations for
|
// compute HDFS locations from snapshot files (which will get the locations for
|
||||||
// referred hfiles)
|
// referred hfiles)
|
||||||
List<String> hosts = getBestLocations(conf,
|
List<String> hosts = getBestLocations(conf,
|
||||||
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
|
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
|
||||||
|
|
||||||
int len = Math.min(3, hosts.size());
|
int len = Math.min(3, hosts.size());
|
||||||
hosts = hosts.subList(0, len);
|
hosts = hosts.subList(0, len);
|
||||||
splits.add(new InputSplit(htd, hri, hosts));
|
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return splits;
|
return splits;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,125 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities for storing more complex collection types in
|
||||||
|
* {@link org.apache.hadoop.conf.Configuration} instances.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public final class ConfigurationUtil {
|
||||||
|
// TODO: hopefully this is a good delimiter; it's not in the base64 alphabet,
|
||||||
|
// nor is it valid for paths
|
||||||
|
public static final char KVP_DELIMITER = '^';
|
||||||
|
|
||||||
|
// Disallow instantiation
|
||||||
|
private ConfigurationUtil() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store a collection of Map.Entry's in conf, with each entry separated by ','
|
||||||
|
* and key values delimited by {@link #KVP_DELIMITER}
|
||||||
|
*
|
||||||
|
* @param conf configuration to store the collection in
|
||||||
|
* @param key overall key to store keyValues under
|
||||||
|
* @param keyValues kvps to be stored under key in conf
|
||||||
|
*/
|
||||||
|
public static void setKeyValues(Configuration conf, String key,
|
||||||
|
Collection<Map.Entry<String, String>> keyValues) {
|
||||||
|
setKeyValues(conf, key, keyValues, KVP_DELIMITER);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store a collection of Map.Entry's in conf, with each entry separated by ','
|
||||||
|
* and key values delimited by delimiter.
|
||||||
|
*
|
||||||
|
* @param conf configuration to store the collection in
|
||||||
|
* @param key overall key to store keyValues under
|
||||||
|
* @param keyValues kvps to be stored under key in conf
|
||||||
|
* @param delimiter character used to separate each kvp
|
||||||
|
*/
|
||||||
|
public static void setKeyValues(Configuration conf, String key,
|
||||||
|
Collection<Map.Entry<String, String>> keyValues, char delimiter) {
|
||||||
|
List<String> serializedKvps = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> kvp : keyValues) {
|
||||||
|
serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()]));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve a list of key value pairs from configuration, stored under the provided key
|
||||||
|
*
|
||||||
|
* @param conf configuration to retrieve kvps from
|
||||||
|
* @param key key under which the key values are stored
|
||||||
|
* @return the list of kvps stored under key in conf, or null if the key isn't present.
|
||||||
|
* @see #setKeyValues(Configuration, String, Collection, char)
|
||||||
|
*/
|
||||||
|
public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key) {
|
||||||
|
return getKeyValues(conf, key, KVP_DELIMITER);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve a list of key value pairs from configuration, stored under the provided key
|
||||||
|
*
|
||||||
|
* @param conf configuration to retrieve kvps from
|
||||||
|
* @param key key under which the key values are stored
|
||||||
|
* @param delimiter character used to separate each kvp
|
||||||
|
* @return the list of kvps stored under key in conf, or null if the key isn't present.
|
||||||
|
* @see #setKeyValues(Configuration, String, Collection, char)
|
||||||
|
*/
|
||||||
|
public static List<Map.Entry<String, String>> getKeyValues(Configuration conf, String key,
|
||||||
|
char delimiter) {
|
||||||
|
String[] kvps = conf.getStrings(key);
|
||||||
|
|
||||||
|
if (kvps == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Map.Entry<String, String>> rtn = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (String kvp : kvps) {
|
||||||
|
String[] splitKvp = StringUtils.split(kvp, delimiter);
|
||||||
|
|
||||||
|
if (splitKvp.length != 2) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Expected key value pair for configuration key '" + key + "'" + " to be of form '<key>"
|
||||||
|
+ delimiter + "<value>; was " + kvp + " instead");
|
||||||
|
}
|
||||||
|
|
||||||
|
rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1]));
|
||||||
|
}
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,135 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.OutputCollector;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.mapred.RunningJob;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@Category({ VerySlowMapReduceTests.class, LargeTests.class })
|
||||||
|
public class TestMultiTableSnapshotInputFormat
|
||||||
|
extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void runJob(String jobName, Configuration c, List<Scan> scans)
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
JobConf job = new JobConf(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
job.setJobName(jobName);
|
||||||
|
job.setMapperClass(Mapper.class);
|
||||||
|
job.setReducerClass(Reducer.class);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
|
||||||
|
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
|
||||||
|
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
|
||||||
|
job.setReducerClass(Reducer.class);
|
||||||
|
job.setNumReduceTasks(1); // one to get final "first" and "last" key
|
||||||
|
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
|
||||||
|
LOG.info("Started " + job.getJobName());
|
||||||
|
|
||||||
|
RunningJob runningJob = JobClient.runJob(job);
|
||||||
|
runningJob.waitForCompletion();
|
||||||
|
assertTrue(runningJob.isSuccessful());
|
||||||
|
LOG.info("After map/reduce completion - job " + jobName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
|
||||||
|
implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(ImmutableBytesWritable key, Result value,
|
||||||
|
OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
makeAssertions(key, value);
|
||||||
|
outputCollector.collect(key, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated
|
||||||
|
* with it. If the stream is already closed then invoking this
|
||||||
|
* method has no effect.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf jobConf) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
|
||||||
|
org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||||
|
NullWritable, NullWritable> {
|
||||||
|
|
||||||
|
private JobConf jobConf;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
|
||||||
|
OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
makeAssertions(key, Lists.newArrayList(values));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated
|
||||||
|
* with it. If the stream is already closed then invoking this
|
||||||
|
* method has no effect.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
super.cleanup(this.jobConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf jobConf) {
|
||||||
|
this.jobConf = jobConf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,274 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base set of tests and setup for input formats touching multiple tables.
|
||||||
|
*/
|
||||||
|
public abstract class MultiTableInputFormatTestBase {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
|
||||||
|
public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
static final String TABLE_NAME = "scantest";
|
||||||
|
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||||
|
static final String KEY_STARTROW = "startRow";
|
||||||
|
static final String KEY_LASTROW = "stpRow";
|
||||||
|
|
||||||
|
static List<String> TABLES = Lists.newArrayList();
|
||||||
|
|
||||||
|
static {
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
TABLES.add(TABLE_NAME + String.valueOf(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// switch TIF to log at DEBUG level
|
||||||
|
TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
|
||||||
|
// start mini hbase cluster
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
// create and fill table
|
||||||
|
for (String tableName : TABLES) {
|
||||||
|
try (HTable table =
|
||||||
|
TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName),
|
||||||
|
INPUT_FAMILY, 4)) {
|
||||||
|
TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// start MR cluster
|
||||||
|
TEST_UTIL.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
Configuration c = TEST_UTIL.getConfiguration();
|
||||||
|
FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass the key and value to reducer.
|
||||||
|
*/
|
||||||
|
public static class ScanMapper extends
|
||||||
|
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||||
|
/**
|
||||||
|
* Pass the key and value to reduce.
|
||||||
|
*
|
||||||
|
* @param key The key, here "aaa", "aab" etc.
|
||||||
|
* @param value The value is the same as the key.
|
||||||
|
* @param context The task context.
|
||||||
|
* @throws IOException When reading the rows fails.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void map(ImmutableBytesWritable key, Result value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
makeAssertions(key, value);
|
||||||
|
context.write(key, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
|
||||||
|
if (value.size() != 1) {
|
||||||
|
throw new IOException("There should only be one input column");
|
||||||
|
}
|
||||||
|
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
|
||||||
|
value.getMap();
|
||||||
|
if (!cf.containsKey(INPUT_FAMILY)) {
|
||||||
|
throw new IOException("Wrong input columns. Missing: '" +
|
||||||
|
Bytes.toString(INPUT_FAMILY) + "'.");
|
||||||
|
}
|
||||||
|
String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
|
||||||
|
LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
|
||||||
|
", value -> " + val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks the last and first keys seen against the scanner boundaries.
|
||||||
|
*/
|
||||||
|
public static class ScanReducer
|
||||||
|
extends
|
||||||
|
Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||||
|
NullWritable, NullWritable> {
|
||||||
|
private String first = null;
|
||||||
|
private String last = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reduce(ImmutableBytesWritable key,
|
||||||
|
Iterable<ImmutableBytesWritable> values, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
makeAssertions(key, values);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void makeAssertions(ImmutableBytesWritable key,
|
||||||
|
Iterable<ImmutableBytesWritable> values) {
|
||||||
|
int count = 0;
|
||||||
|
for (ImmutableBytesWritable value : values) {
|
||||||
|
String val = Bytes.toStringBinary(value.get());
|
||||||
|
LOG.debug("reduce: key[" + count + "] -> " +
|
||||||
|
Bytes.toStringBinary(key.get()) + ", value -> " + val);
|
||||||
|
if (first == null) first = val;
|
||||||
|
last = val;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(3, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void cleanup(Context context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
Configuration c = context.getConfiguration();
|
||||||
|
cleanup(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void cleanup(Configuration c) {
|
||||||
|
String startRow = c.get(KEY_STARTROW);
|
||||||
|
String lastRow = c.get(KEY_LASTROW);
|
||||||
|
LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
|
||||||
|
startRow + "\"");
|
||||||
|
LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
|
||||||
|
"\"");
|
||||||
|
if (startRow != null && startRow.length() > 0) {
|
||||||
|
assertEquals(startRow, first);
|
||||||
|
}
|
||||||
|
if (lastRow != null && lastRow.length() > 0) {
|
||||||
|
assertEquals(lastRow, last);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanEmptyToEmpty() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
testScan(null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanEmptyToAPP() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
testScan(null, "app", "apo");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanOBBToOPP() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
testScan("obb", "opp", "opo");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanYZYToEmpty() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
testScan("yzy", null, "zzz");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests a MR scan using specific start and stop rows.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void testScan(String start, String stop, String last)
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
String jobName =
|
||||||
|
"Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
|
||||||
|
(stop != null ? stop.toUpperCase() : "Empty");
|
||||||
|
LOG.info("Before map/reduce startup - job " + jobName);
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
c.set(KEY_STARTROW, start != null ? start : "");
|
||||||
|
c.set(KEY_LASTROW, last != null ? last : "");
|
||||||
|
|
||||||
|
List<Scan> scans = new ArrayList<Scan>();
|
||||||
|
|
||||||
|
for (String tableName : TABLES) {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
|
||||||
|
scan.addFamily(INPUT_FAMILY);
|
||||||
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
|
||||||
|
|
||||||
|
if (start != null) {
|
||||||
|
scan.setStartRow(Bytes.toBytes(start));
|
||||||
|
}
|
||||||
|
if (stop != null) {
|
||||||
|
scan.setStopRow(Bytes.toBytes(stop));
|
||||||
|
}
|
||||||
|
|
||||||
|
scans.add(scan);
|
||||||
|
|
||||||
|
LOG.info("scan before: " + scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
runJob(jobName, c, scans);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void runJob(String jobName, Configuration c, List<Scan> scans)
|
||||||
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
Job job = new Job(c, jobName);
|
||||||
|
|
||||||
|
initJob(scans, job);
|
||||||
|
job.setReducerClass(ScanReducer.class);
|
||||||
|
job.setNumReduceTasks(1); // one to get final "first" and "last" key
|
||||||
|
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
|
||||||
|
LOG.info("Started " + job.getJobName());
|
||||||
|
job.waitForCompletion(true);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
LOG.info("After map/reduce completion - job " + jobName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -17,236 +17,34 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.NavigableMap;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.io.NullWritable;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests various scan start and stop row scenarios. This is set in a scan and
|
* Tests various scan start and stop row scenarios. This is set in a scan and
|
||||||
* tested in a MapReduce job to see if that is handed over and done properly
|
* tested in a MapReduce job to see if that is handed over and done properly
|
||||||
* too.
|
* too.
|
||||||
*/
|
*/
|
||||||
@Category({VerySlowMapReduceTests.class, LargeTests.class})
|
@Category({VerySlowMapReduceTests.class, LargeTests.class})
|
||||||
public class TestMultiTableInputFormat {
|
public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
|
|
||||||
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
static final String TABLE_NAME = "scantest";
|
|
||||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
|
||||||
static final String KEY_STARTROW = "startRow";
|
|
||||||
static final String KEY_LASTROW = "stpRow";
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setupLogging() {
|
||||||
// switch TIF to log at DEBUG level
|
|
||||||
TEST_UTIL.enableDebug(MultiTableInputFormat.class);
|
TEST_UTIL.enableDebug(MultiTableInputFormat.class);
|
||||||
TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
|
|
||||||
// start mini hbase cluster
|
|
||||||
TEST_UTIL.startMiniCluster(3);
|
|
||||||
// create and fill table
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
try (HTable table =
|
|
||||||
TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
|
|
||||||
INPUT_FAMILY, 4)) {
|
|
||||||
TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// start MR cluster
|
|
||||||
TEST_UTIL.startMiniMapReduceCluster();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@Override
|
||||||
public static void tearDownAfterClass() throws Exception {
|
protected void initJob(List<Scan> scans, Job job) throws IOException {
|
||||||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
Configuration c = TEST_UTIL.getConfiguration();
|
|
||||||
FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pass the key and value to reducer.
|
|
||||||
*/
|
|
||||||
public static class ScanMapper extends
|
|
||||||
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
|
||||||
/**
|
|
||||||
* Pass the key and value to reduce.
|
|
||||||
*
|
|
||||||
* @param key The key, here "aaa", "aab" etc.
|
|
||||||
* @param value The value is the same as the key.
|
|
||||||
* @param context The task context.
|
|
||||||
* @throws IOException When reading the rows fails.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void map(ImmutableBytesWritable key, Result value, Context context)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
if (value.size() != 1) {
|
|
||||||
throw new IOException("There should only be one input column");
|
|
||||||
}
|
|
||||||
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
|
|
||||||
value.getMap();
|
|
||||||
if (!cf.containsKey(INPUT_FAMILY)) {
|
|
||||||
throw new IOException("Wrong input columns. Missing: '" +
|
|
||||||
Bytes.toString(INPUT_FAMILY) + "'.");
|
|
||||||
}
|
|
||||||
String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
|
|
||||||
LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
|
|
||||||
", value -> " + val);
|
|
||||||
context.write(key, key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks the last and first keys seen against the scanner boundaries.
|
|
||||||
*/
|
|
||||||
public static class ScanReducer
|
|
||||||
extends
|
|
||||||
Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
|
||||||
NullWritable, NullWritable> {
|
|
||||||
private String first = null;
|
|
||||||
private String last = null;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void reduce(ImmutableBytesWritable key,
|
|
||||||
Iterable<ImmutableBytesWritable> values, Context context)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
int count = 0;
|
|
||||||
for (ImmutableBytesWritable value : values) {
|
|
||||||
String val = Bytes.toStringBinary(value.get());
|
|
||||||
LOG.debug("reduce: key[" + count + "] -> " +
|
|
||||||
Bytes.toStringBinary(key.get()) + ", value -> " + val);
|
|
||||||
if (first == null) first = val;
|
|
||||||
last = val;
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
assertEquals(3, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void cleanup(Context context) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
Configuration c = context.getConfiguration();
|
|
||||||
String startRow = c.get(KEY_STARTROW);
|
|
||||||
String lastRow = c.get(KEY_LASTROW);
|
|
||||||
LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
|
|
||||||
startRow + "\"");
|
|
||||||
LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
|
|
||||||
"\"");
|
|
||||||
if (startRow != null && startRow.length() > 0) {
|
|
||||||
assertEquals(startRow, first);
|
|
||||||
}
|
|
||||||
if (lastRow != null && lastRow.length() > 0) {
|
|
||||||
assertEquals(lastRow, last);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanEmptyToEmpty() throws IOException, InterruptedException,
|
|
||||||
ClassNotFoundException {
|
|
||||||
testScan(null, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanEmptyToAPP() throws IOException, InterruptedException,
|
|
||||||
ClassNotFoundException {
|
|
||||||
testScan(null, "app", "apo");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanOBBToOPP() throws IOException, InterruptedException,
|
|
||||||
ClassNotFoundException {
|
|
||||||
testScan("obb", "opp", "opo");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanYZYToEmpty() throws IOException, InterruptedException,
|
|
||||||
ClassNotFoundException {
|
|
||||||
testScan("yzy", null, "zzz");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests a MR scan using specific start and stop rows.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
* @throws ClassNotFoundException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
private void testScan(String start, String stop, String last)
|
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
|
||||||
String jobName =
|
|
||||||
"Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
|
|
||||||
(stop != null ? stop.toUpperCase() : "Empty");
|
|
||||||
LOG.info("Before map/reduce startup - job " + jobName);
|
|
||||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
|
||||||
|
|
||||||
c.set(KEY_STARTROW, start != null ? start : "");
|
|
||||||
c.set(KEY_LASTROW, last != null ? last : "");
|
|
||||||
|
|
||||||
List<Scan> scans = new ArrayList<Scan>();
|
|
||||||
|
|
||||||
for(int i=0; i<3; i++){
|
|
||||||
Scan scan = new Scan();
|
|
||||||
|
|
||||||
scan.addFamily(INPUT_FAMILY);
|
|
||||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
|
|
||||||
|
|
||||||
if (start != null) {
|
|
||||||
scan.setStartRow(Bytes.toBytes(start));
|
|
||||||
}
|
|
||||||
if (stop != null) {
|
|
||||||
scan.setStopRow(Bytes.toBytes(stop));
|
|
||||||
}
|
|
||||||
|
|
||||||
scans.add(scan);
|
|
||||||
|
|
||||||
LOG.info("scan before: " + scan);
|
|
||||||
}
|
|
||||||
|
|
||||||
Job job = new Job(c, jobName);
|
|
||||||
|
|
||||||
TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
|
TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
|
||||||
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||||
job.setReducerClass(ScanReducer.class);
|
|
||||||
job.setNumReduceTasks(1); // one to get final "first" and "last" key
|
|
||||||
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
|
|
||||||
LOG.info("Started " + job.getJobName());
|
|
||||||
job.waitForCompletion(true);
|
|
||||||
assertTrue(job.isSuccessful());
|
|
||||||
LOG.info("After map/reduce completion - job " + jobName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,93 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Multimaps;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Category({ VerySlowMapReduceTests.class, LargeTests.class })
|
||||||
|
public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
|
||||||
|
|
||||||
|
protected Path restoreDir;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpSnapshots() throws Exception {
|
||||||
|
|
||||||
|
TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
|
||||||
|
TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
|
||||||
|
|
||||||
|
// take a snapshot of every table we have.
|
||||||
|
for (String tableName : TABLES) {
|
||||||
|
SnapshotTestingUtils
|
||||||
|
.createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName),
|
||||||
|
ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null,
|
||||||
|
snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
|
||||||
|
TEST_UTIL.getTestFileSystem(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.restoreDir = new Path("/tmp");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initJob(List<Scan> scans, Job job) throws IOException {
|
||||||
|
TableMapReduceUtil
|
||||||
|
.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class,
|
||||||
|
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) {
|
||||||
|
return Multimaps.index(scans, new Function<Scan, String>() {
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public String apply(Scan input) {
|
||||||
|
return snapshotNameForTable(
|
||||||
|
Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
|
||||||
|
}
|
||||||
|
}).asMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String snapshotNameForTable(String tableName) {
|
||||||
|
return tableName + "_snapshot";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,185 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
@Category({ SmallTests.class })
|
||||||
|
public class TestMultiTableSnapshotInputFormatImpl {
|
||||||
|
|
||||||
|
private MultiTableSnapshotInputFormatImpl subject;
|
||||||
|
private Map<String, Collection<Scan>> snapshotScans;
|
||||||
|
private Path restoreDir;
|
||||||
|
private Configuration conf;
|
||||||
|
private Path rootDir;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
|
||||||
|
|
||||||
|
// mock out restoreSnapshot
|
||||||
|
// TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper
|
||||||
|
// dependency into the
|
||||||
|
// input format. However, we need a new RestoreSnapshotHelper per snapshot in the current
|
||||||
|
// design, and it *also*
|
||||||
|
// feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would
|
||||||
|
// probably be the more "pure"
|
||||||
|
// way of doing things. This is the lesser of two evils, perhaps?
|
||||||
|
doNothing().when(this.subject).
|
||||||
|
restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class),
|
||||||
|
any(Path.class), any(FileSystem.class));
|
||||||
|
|
||||||
|
this.conf = new Configuration();
|
||||||
|
this.rootDir = new Path("file:///test-root-dir");
|
||||||
|
FSUtils.setRootDir(conf, rootDir);
|
||||||
|
this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1",
|
||||||
|
ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2",
|
||||||
|
ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
|
||||||
|
new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))));
|
||||||
|
|
||||||
|
this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callSetInput() throws IOException {
|
||||||
|
subject.setInput(this.conf, snapshotScans, restoreDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Collection<ScanWithEquals>> toScanWithEquals(
|
||||||
|
Map<String, Collection<Scan>> snapshotScans) throws IOException {
|
||||||
|
Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
|
||||||
|
List<ScanWithEquals> scans = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (Scan scan : entry.getValue()) {
|
||||||
|
scans.add(new ScanWithEquals(scan));
|
||||||
|
}
|
||||||
|
rtn.put(entry.getKey(), scans);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ScanWithEquals {
|
||||||
|
|
||||||
|
private final String startRow;
|
||||||
|
private final String stopRow;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance of this class while copying all values.
|
||||||
|
*
|
||||||
|
* @param scan The scan instance to copy from.
|
||||||
|
* @throws java.io.IOException When copying the values fails.
|
||||||
|
*/
|
||||||
|
public ScanWithEquals(Scan scan) throws IOException {
|
||||||
|
this.startRow = Bytes.toStringBinary(scan.getStartRow());
|
||||||
|
this.stopRow = Bytes.toStringBinary(scan.getStopRow());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (!(obj instanceof ScanWithEquals)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ScanWithEquals otherScan = (ScanWithEquals) obj;
|
||||||
|
return Objects.equals(this.startRow, otherScan.startRow) && Objects
|
||||||
|
.equals(this.stopRow, otherScan.stopRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow)
|
||||||
|
.add("stopRow", stopRow).toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetInputSetsSnapshotToScans() throws Exception {
|
||||||
|
|
||||||
|
callSetInput();
|
||||||
|
|
||||||
|
Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf);
|
||||||
|
|
||||||
|
// convert to scans we can use .equals on
|
||||||
|
Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual);
|
||||||
|
Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans);
|
||||||
|
|
||||||
|
assertEquals(expectedWithEquals, actualWithEquals);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetInputPushesRestoreDirectories() throws Exception {
|
||||||
|
callSetInput();
|
||||||
|
|
||||||
|
Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
|
||||||
|
|
||||||
|
assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
|
||||||
|
callSetInput();
|
||||||
|
|
||||||
|
Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
|
||||||
|
|
||||||
|
for (Path snapshotDir : restoreDirs.values()) {
|
||||||
|
assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir,
|
||||||
|
snapshotDir.getParent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetInputRestoresSnapshots() throws Exception {
|
||||||
|
callSetInput();
|
||||||
|
|
||||||
|
Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf);
|
||||||
|
|
||||||
|
for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
|
||||||
|
verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir),
|
||||||
|
eq(entry.getValue()), any(FileSystem.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
@Category({ SmallTests.class })
|
||||||
|
public class TestConfigurationUtil {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private Map<String, String> keyValues;
|
||||||
|
private String key;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.conf = new Configuration();
|
||||||
|
this.keyValues = ImmutableMap.of("k1", "v1", "k2", "v2");
|
||||||
|
this.key = "my_conf_key";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callSetKeyValues() {
|
||||||
|
ConfigurationUtil.setKeyValues(conf, key, keyValues.entrySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Map.Entry<String, String>> callGetKeyValues() {
|
||||||
|
return ConfigurationUtil.getKeyValues(conf, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAndSetKeyValuesWithValues() throws Exception {
|
||||||
|
callSetKeyValues();
|
||||||
|
assertEquals(Lists.newArrayList(this.keyValues.entrySet()), callGetKeyValues());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetKeyValuesWithUnsetKey() throws Exception {
|
||||||
|
assertNull(callGetKeyValues());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user