HBASE-11326 Use an InputFormat for ExportSnapshot

This commit is contained in:
Matteo Bertozzi 2014-06-12 09:06:00 +01:00
parent 95a7e72302
commit 8064bd4fff
2 changed files with 212 additions and 130 deletions

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.snapshot;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@ -62,8 +64,14 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
@ -84,6 +92,10 @@ import org.apache.hadoop.util.ToolRunner;
public final class ExportSnapshot extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
private static final String MR_NUM_MAPS = "mapreduce.job.maps";
private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
@ -456,19 +468,23 @@ public final class ExportSnapshot extends Configured implements Tool {
}
}
// ==========================================================================
// Input Format
// ==========================================================================
/**
* Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
* @return list of files referenced by the snapshot (pair of path and size)
*/
private List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final FileSystem fs,
final Path snapshotDir) throws IOException {
private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
final FileSystem fs, final Path snapshotDir) throws IOException {
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
final TableName table = TableName.valueOf(snapshotDesc.getTable());
final Configuration conf = getConf();
// Get snapshot files
LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
new SnapshotReferenceUtil.SnapshotVisitor() {
@Override
@ -486,7 +502,12 @@ public final class ExportSnapshot extends Configured implements Tool {
.setHfile(path.toString())
.build();
long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
long size;
if (storeFile.hasFileSize()) {
size = storeFile.getFileSize();
} else {
size = new HFileLink(conf, path).getFileStatus(fs).getLen();
}
files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
}
}
@ -516,7 +537,7 @@ public final class ExportSnapshot extends Configured implements Tool {
* and then each group fetch the bigger file available, iterating through groups
* alternating the direction.
*/
static List<List<SnapshotFileInfo>> getBalancedSplits(
static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
// Sort files by size, from small to big
Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
@ -527,18 +548,19 @@ public final class ExportSnapshot extends Configured implements Tool {
});
// create balanced groups
List<List<SnapshotFileInfo>> fileGroups = new LinkedList<List<SnapshotFileInfo>>();
List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
long[] sizeGroups = new long[ngroups];
int hi = files.size() - 1;
int lo = 0;
List<SnapshotFileInfo> group;
List<Pair<SnapshotFileInfo, Long>> group;
int dir = 1;
int g = 0;
while (hi >= lo) {
if (g == fileGroups.size()) {
group = new LinkedList<SnapshotFileInfo>();
group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
fileGroups.add(group);
} else {
group = fileGroups.get(g);
@ -548,7 +570,7 @@ public final class ExportSnapshot extends Configured implements Tool {
// add the hi one
sizeGroups[g] += fileInfo.getSecond();
group.add(fileInfo.getFirst());
group.add(fileInfo);
// change direction when at the end or the beginning
g += dir;
@ -570,85 +592,168 @@ public final class ExportSnapshot extends Configured implements Tool {
return fileGroups;
}
private static Path getInputFolderPath(Configuration conf)
throws IOException, InterruptedException {
Path stagingDir = JobUtil.getStagingDir(conf);
return new Path(stagingDir, INPUT_FOLDER_PREFIX +
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
@Override
public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
TaskAttemptContext tac) throws IOException, InterruptedException {
return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
}
/**
* Create the input files, with the path to copy, for the MR job.
* Each input files contains n files, and each input file has a similar amount data to copy.
* The number of input files created are based on the number of mappers provided as argument
* and the number of the files to copy.
*/
private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath,
final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, int mappers)
throws IOException, InterruptedException {
FileSystem fs = inputFolderPath.getFileSystem(conf);
LOG.debug("Input folder location: " + inputFolderPath);
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
List<List<SnapshotFileInfo>> splits = getBalancedSplits(snapshotFiles, mappers);
Path[] inputFiles = new Path[splits.size()];
BytesWritable key = new BytesWritable();
for (int i = 0; i < inputFiles.length; i++) {
List<SnapshotFileInfo> files = splits.get(i);
inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
BytesWritable.class, NullWritable.class);
LOG.debug("Input split: " + i);
try {
for (SnapshotFileInfo file: files) {
byte[] pbFileInfo = file.toByteArray();
key.set(pbFileInfo, 0, pbFileInfo.length);
writer.append(key, NullWritable.get());
List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
if (mappers == 0 && snapshotFiles.size() > 0) {
mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, snapshotFiles.size());
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
} finally {
writer.close();
List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
List<InputSplit> splits = new ArrayList(groups.size());
for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
splits.add(new ExportSnapshotInputSplit(files));
}
return splits;
}
private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
private List<Pair<BytesWritable, Long>> files;
private long length;
public ExportSnapshotInputSplit() {
this.files = null;
}
public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
this.files = new ArrayList(snapshotFiles.size());
for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
this.files.add(new Pair<BytesWritable, Long>(
new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
this.length += fileInfo.getSecond();
}
}
return inputFiles;
private List<Pair<BytesWritable, Long>> getSplitKeys() {
return files;
}
@Override
public long getLength() throws IOException, InterruptedException {
return length;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[] {};
}
@Override
public void readFields(DataInput in) throws IOException {
int count = in.readInt();
files = new ArrayList<Pair<BytesWritable, Long>>(count);
length = 0;
for (int i = 0; i < count; ++i) {
BytesWritable fileInfo = new BytesWritable();
fileInfo.readFields(in);
long size = in.readLong();
files.add(new Pair<BytesWritable, Long>(fileInfo, size));
length += size;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(files.size());
for (final Pair<BytesWritable, Long> fileInfo: files) {
fileInfo.getFirst().write(out);
out.writeLong(fileInfo.getSecond());
}
}
}
private static class ExportSnapshotRecordReader
extends RecordReader<BytesWritable, NullWritable> {
private final List<Pair<BytesWritable, Long>> files;
private long totalSize = 0;
private long procSize = 0;
private int index = -1;
ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
this.files = files;
for (Pair<BytesWritable, Long> fileInfo: files) {
totalSize += fileInfo.getSecond();
}
}
@Override
public void close() { }
@Override
public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
@Override
public NullWritable getCurrentValue() { return NullWritable.get(); }
@Override
public float getProgress() { return (float)procSize / totalSize; }
@Override
public void initialize(InputSplit split, TaskAttemptContext tac) { }
@Override
public boolean nextKeyValue() {
if (index >= 0) {
procSize += files.get(index).getSecond();
}
return(++index < files.size());
}
}
}
// ==========================================================================
// Tool
// ==========================================================================
/**
* Run Map-Reduce Job to perform the files copy.
*/
private void runCopyJob(final Path inputRoot, final Path outputRoot,
final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, final boolean verifyChecksum,
final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode,
final int mappers, final int bandwidthMB)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
if (mappers > 0) {
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
conf.setInt(CONF_FILES_MODE, filesMode);
conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
conf.set(CONF_INPUT_ROOT, inputRoot.toString());
conf.setInt("mapreduce.job.maps", mappers);
conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
conf.set(CONF_SNAPSHOT_NAME, snapshotName);
conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
Job job = new Job(conf);
job.setJobName("ExportSnapshot");
job.setJobName("ExportSnapshot-" + snapshotName);
job.setJarByClass(ExportSnapshot.class);
TableMapReduceUtil.addDependencyJars(job);
job.setMapperClass(ExportMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setInputFormatClass(ExportSnapshotInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapSpeculativeExecution(false);
job.setNumReduceTasks(0);
// Create MR Input
Path inputFolderPath = getInputFolderPath(conf);
for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) {
LOG.debug("Add Input Path=" + path);
SequenceFileInputFormat.addInputPath(job, path);
}
try {
// Acquire the delegation Tokens
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { inputRoot, outputRoot }, conf);
@ -659,14 +764,6 @@ public final class ExportSnapshot extends Configured implements Tool {
// when it will be available on all the supported versions.
throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
}
} finally {
// Remove MR Input
try {
inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true);
} catch (IOException e) {
LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e);
}
}
}
private void verifySnapshot(final Configuration baseConf,
@ -698,11 +795,11 @@ public final class ExportSnapshot extends Configured implements Tool {
int mappers = 0;
Configuration conf = getConf();
Path inputRoot = FSUtils.getRootDir(conf);
// Process command line args
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
try {
if (cmd.equals("-snapshot")) {
snapshotName = args[++i];
} else if (cmd.equals("-target")) {
@ -710,10 +807,7 @@ public final class ExportSnapshot extends Configured implements Tool {
} else if (cmd.equals("-copy-to")) {
outputRoot = new Path(args[++i]);
} else if (cmd.equals("-copy-from")) {
Path sourceDir = new Path(args[++i]);
URI defaultFs = sourceDir.getFileSystem(conf).getUri();
FSUtils.setFsDefault(conf, new Path(defaultFs));
FSUtils.setRootDir(conf, sourceDir);
inputRoot = new Path(args[++i]);
} else if (cmd.equals("-no-checksum-verify")) {
verifyChecksum = false;
} else if (cmd.equals("-no-target-verify")) {
@ -736,9 +830,6 @@ public final class ExportSnapshot extends Configured implements Tool {
System.err.println("UNEXPECTED: " + cmd);
printUsageAndExit();
}
} catch (IOException e) {
printUsageAndExit();
}
}
// Check user options
@ -756,7 +847,6 @@ public final class ExportSnapshot extends Configured implements Tool {
targetName = snapshotName;
}
Path inputRoot = FSUtils.getRootDir(conf);
FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
@ -800,14 +890,6 @@ public final class ExportSnapshot extends Configured implements Tool {
}
}
// Step 0 - Extract snapshot files to copy
LOG.info("Loading Snapshot hfile list");
final List<Pair<SnapshotFileInfo, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
if (mappers == 0 && files.size() > 0) {
mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, files.size());
}
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
// The snapshot references must be copied before the hfiles otherwise the cleaner
// will remove them because they are unreferenced.
@ -833,13 +915,8 @@ public final class ExportSnapshot extends Configured implements Tool {
// The snapshot references must be copied before the files otherwise the files gets removed
// by the HFileArchiver, since they have no references.
try {
if (files.size() == 0) {
LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
} else {
runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
filesUser, filesGroup, filesMode, mappers, bandwidthMB);
}
LOG.info("Finalize the Snapshot Export");
if (!skipTmp) {

View File

@ -163,26 +163,31 @@ public class TestExportSnapshot {
// group 2: 18, 13, 8, 3 (total size: 42)
// group 3: 17, 12, 7, 4 (total size: 42)
// group 4: 16, 11, 6, 5 (total size: 42)
List<List<SnapshotFileInfo>> splits = ExportSnapshot.getBalancedSplits(files, 5);
List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5);
assertEquals(5, splits.size());
String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
verifyBalanceSplit(splits.get(0), split0);
verifyBalanceSplit(splits.get(0), split0, 42);
String[] split1 = new String[] {"file-19", "file-12", "file-9", "file-2"};
verifyBalanceSplit(splits.get(1), split1);
verifyBalanceSplit(splits.get(1), split1, 42);
String[] split2 = new String[] {"file-18", "file-13", "file-8", "file-3"};
verifyBalanceSplit(splits.get(2), split2);
verifyBalanceSplit(splits.get(2), split2, 42);
String[] split3 = new String[] {"file-17", "file-14", "file-7", "file-4"};
verifyBalanceSplit(splits.get(3), split3);
verifyBalanceSplit(splits.get(3), split3, 42);
String[] split4 = new String[] {"file-16", "file-15", "file-6", "file-5"};
verifyBalanceSplit(splits.get(4), split4);
verifyBalanceSplit(splits.get(4), split4, 42);
}
private void verifyBalanceSplit(final List<SnapshotFileInfo> split, final String[] expected) {
private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split,
final String[] expected, final long expectedSize) {
assertEquals(expected.length, split.size());
long totalSize = 0;
for (int i = 0; i < expected.length; ++i) {
assertEquals(expected[i], split.get(i).getHfile());
Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
assertEquals(expected[i], fileInfo.getFirst().getHfile());
totalSize += fileInfo.getSecond();
}
assertEquals(expectedSize, totalSize);
}
/**