diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java index 0aeb8df396d..7d618581001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java @@ -243,6 +243,17 @@ public class SnapshotDescriptionUtils { snapshot.getName()); } + /** + * Get the directory to build a snapshot, before it is finalized + * @param snapshotName name of the snapshot + * @param rootDir root directory of the hbase installation + * @return {@link Path} where one can build a snapshot + */ + public static Path getWorkingSnapshotDir(String snapshotName, final Path rootDir) { + return getCompletedSnapshotDir(new Path(getSnapshotsDir(rootDir), SNAPSHOT_TMP_DIR_NAME), + snapshotName); + } + /** * Get the directory to store the snapshot instance * @param snapshotsDir hbase-global directory for storing all snapshots diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java new file mode 100644 index 00000000000..6e943d5270e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot.exception; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown when a snapshot could not be exported due to an error during the operation. + */ +@InterfaceAudience.Public +@SuppressWarnings("serial") +public class ExportSnapshotException extends HBaseSnapshotException { + + /** + * @param msg message describing the exception + */ + public ExportSnapshotException(String msg) { + super(msg); + } + + /** + * @param message message describing the exception + * @param e cause + */ + public ExportSnapshotException(String message, Exception e) { + super(message, e); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java new file mode 100644 index 00000000000..525821e0592 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java @@ -0,0 +1,714 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.snapshot.tool; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.HLogLink; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.mapreduce.JobUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.snapshot.exception.ExportSnapshotException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Export the specified snapshot to a given FileSystem. + * + * The .snapshot/name folder is copied to the destination cluster + * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location. + * When everything is done, the second cluster can restore the snapshot. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class ExportSnapshot extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(ExportSnapshot.class); + + private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; + private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; + private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; + private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; + private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; + private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; + + private static final String INPUT_FOLDER_PREFIX = "export-files."; + + // Export Map-Reduce Counters, to keep track of the progress + public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED }; + + private static class ExportMapper extends Mapper { + final static int REPORT_SIZE = 1 * 1024 * 1024; + final static int BUFFER_SIZE = 64 * 1024; + + private boolean verifyChecksum; + private String filesGroup; + private String filesUser; + private short filesMode; + + private FileSystem outputFs; + private Path outputArchive; + private Path outputRoot; + + private FileSystem inputFs; + private Path inputArchive; + private Path inputRoot; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); + + filesGroup = conf.get(CONF_FILES_GROUP); + filesUser = conf.get(CONF_FILES_USER); + filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); + outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); + inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); + + inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); + outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); + + try { + inputFs = FileSystem.get(inputRoot.toUri(), conf); + } catch (IOException e) { + throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e); + } + + try { + outputFs = FileSystem.get(outputRoot.toUri(), conf); + } catch (IOException e) { + throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e); + } + } + + @Override + public void cleanup(Context context) { + if (outputFs != null) { + try { + outputFs.close(); + } catch (IOException e) { + LOG.error("Error closing output FileSystem", e); + } + } + + if (inputFs != null) { + try { + inputFs.close(); + } catch (IOException e) { + LOG.error("Error closing input FileSystem", e); + } + } + } + + @Override + public void map(Text key, NullWritable value, Context context) + throws InterruptedException, IOException { + Path inputPath = new Path(key.toString()); + Path outputPath = getOutputPath(inputPath); + + LOG.info("copy file input=" + inputPath + " output=" + outputPath); + if (copyFile(context, inputPath, outputPath)) { + LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); + } + } + + /** + * Returns the location where the inputPath will be copied. + * - hfiles are encoded as hfile links hfile-region-table + * - logs are encoded as serverName/logName + */ + private Path getOutputPath(final Path inputPath) throws IOException { + Path path; + if (HFileLink.isHFileLink(inputPath)) { + String family = inputPath.getParent().getName(); + String table = HFileLink.getReferencedTableName(inputPath.getName()); + String region = HFileLink.getReferencedRegionName(inputPath.getName()); + String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); + path = new Path(table, new Path(region, new Path(family, hfile))); + } else if (isHLogLinkPath(inputPath)) { + String logName = inputPath.getName(); + path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName); + } else { + path = inputPath; + } + return new Path(outputArchive, path); + } + + private boolean copyFile(final Context context, final Path inputPath, final Path outputPath) + throws IOException { + FSDataInputStream in = openSourceFile(inputPath); + if (in == null) { + context.getCounter(Counter.MISSING_FILES).increment(1); + return false; + } + + try { + // Verify if the input file exists + FileStatus inputStat = getFileStatus(inputFs, inputPath); + if (inputStat == null) return false; + + // Verify if the output file exists and is the same that we want to copy + FileStatus outputStat = getFileStatus(outputFs, outputPath); + if (outputStat != null && sameFile(inputStat, outputStat)) { + LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file."); + return true; + } + + context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); + + // Ensure that the output folder is there and copy the file + outputFs.mkdirs(outputPath.getParent()); + FSDataOutputStream out = outputFs.create(outputPath, true); + try { + if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen())) + return false; + } finally { + out.close(); + } + + // Preserve attributes + return preserveAttributes(outputPath, inputStat); + } finally { + in.close(); + } + } + + /** + * Preserve the files attribute selected by the user copying them from the source file + */ + private boolean preserveAttributes(final Path path, final FileStatus refStat) { + FileStatus stat; + try { + stat = outputFs.getFileStatus(path); + } catch (IOException e) { + LOG.warn("Unable to get the status for file=" + path); + return false; + } + + try { + if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { + outputFs.setPermission(path, new FsPermission(filesMode)); + } else if (!stat.getPermission().equals(refStat.getPermission())) { + outputFs.setPermission(path, refStat.getPermission()); + } + } catch (IOException e) { + LOG.error("Unable to set the permission for file=" + path, e); + return false; + } + + try { + String user = (filesUser != null) ? filesUser : refStat.getOwner(); + String group = (filesGroup != null) ? filesGroup : refStat.getGroup(); + if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { + outputFs.setOwner(path, user, group); + } + } catch (IOException e) { + LOG.error("Unable to set the owner/group for file=" + path, e); + return false; + } + + return true; + } + + private boolean copyData(final Context context, + final Path inputPath, final FSDataInputStream in, + final Path outputPath, final FSDataOutputStream out, + final long inputFileSize) { + final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + + " (%.3f%%) from " + inputPath + " to " + outputPath; + + try { + byte[] buffer = new byte[BUFFER_SIZE]; + long totalBytesWritten = 0; + int reportBytes = 0; + int bytesRead; + + while ((bytesRead = in.read(buffer)) > 0) { + out.write(buffer, 0, bytesRead); + totalBytesWritten += bytesRead; + reportBytes += bytesRead; + + if (reportBytes >= REPORT_SIZE) { + context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); + context.setStatus(String.format(statusMessage, + StringUtils.humanReadableInt(totalBytesWritten), + reportBytes/(float)inputFileSize)); + reportBytes = 0; + } + } + + context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); + context.setStatus(String.format(statusMessage, + StringUtils.humanReadableInt(totalBytesWritten), + reportBytes/(float)inputFileSize)); + + // Verify that the written size match + if (totalBytesWritten != inputFileSize) { + LOG.error("number of bytes copied not matching copied=" + totalBytesWritten + + " expected=" + inputFileSize + " for file=" + inputPath); + context.getCounter(Counter.COPY_FAILED).increment(1); + return false; + } + + return true; + } catch (IOException e) { + LOG.error("Error copying " + inputPath + " to " + outputPath, e); + context.getCounter(Counter.COPY_FAILED).increment(1); + return false; + } + } + + private FSDataInputStream openSourceFile(final Path path) { + try { + if (HFileLink.isHFileLink(path)) { + return new HFileLink(inputRoot, inputArchive, path).open(inputFs); + } else if (isHLogLinkPath(path)) { + String serverName = path.getParent().getName(); + String logName = path.getName(); + return new HLogLink(inputRoot, serverName, logName).open(inputFs); + } + return inputFs.open(path); + } catch (IOException e) { + LOG.error("Unable to open source file=" + path, e); + return null; + } + } + + private FileStatus getFileStatus(final FileSystem fs, final Path path) { + try { + if (HFileLink.isHFileLink(path)) { + Path refPath = HFileLink.getReferencedPath(fs, inputRoot, inputArchive, path); + return fs.getFileStatus(refPath); + } else if (isHLogLinkPath(path)) { + String serverName = path.getParent().getName(); + String logName = path.getName(); + return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs); + } + return fs.getFileStatus(path); + } catch (IOException e) { + LOG.warn("Unable to get the status for file=" + path); + return null; + } + } + + private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { + try { + return fs.getFileChecksum(path); + } catch (IOException e) { + LOG.warn("Unable to get checksum for file=" + path, e); + return null; + } + } + + /** + * Check if the two files are equal by looking at the file length, + * and at the checksum (if user has specified the verifyChecksum flag). + */ + private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { + // Not matching length + if (inputStat.getLen() != outputStat.getLen()) return false; + + // Mark files as equals, since user asked for no checksum verification + if (!verifyChecksum) return true; + + // If checksums are not available, files are not the same. + FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); + if (inChecksum == null) return false; + + FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); + if (outChecksum == null) return false; + + return inChecksum.equals(outChecksum); + } + + /** + * HLog files are encoded as serverName/logName + * and since all the other files should be in /hbase/table/..path.. + * we can rely on the depth, for now. + */ + private static boolean isHLogLinkPath(final Path path) { + return path.depth() == 2; + } + } + + /** + * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce. + * @return list of files referenced by the snapshot (pair of path and size) + */ + private List> getSnapshotFiles(final FileSystem fs, final Path snapshotDir) throws IOException { + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + final List> files = new ArrayList>(); + final String table = snapshotDesc.getTable(); + final Configuration conf = getConf(); + + // Get snapshot files + SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir, + new SnapshotReferenceUtil.FileVisitor() { + public void storeFile (final String region, final String family, final String hfile) + throws IOException { + Path path = new Path(family, HFileLink.createHFileLinkName(table, region, hfile)); + long size = fs.getFileStatus(HFileLink.getReferencedPath(conf, fs, path)).getLen(); + files.add(new Pair(path, size)); + } + + public void recoveredEdits (final String region, final String logfile) + throws IOException { + // copied with the snapshot referenecs + } + + public void logFile (final String server, final String logfile) + throws IOException { + long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen(); + files.add(new Pair(new Path(server, logfile), size)); + } + }); + + return files; + } + + /** + * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. + * The groups created will have similar amounts of bytes. + *

+ * The algorithm used is pretty straightforward; the file list is sorted by size, + * and then each group fetch the bigger file available, iterating through groups + * alternating the direction. + */ + static List> getBalancedSplits(final List> files, int ngroups) { + // Sort files by size, from small to big + Collections.sort(files, new Comparator>() { + public int compare(Pair a, Pair b) { + long r = a.getSecond() - b.getSecond(); + return (r < 0) ? -1 : ((r > 0) ? 1 : 0); + } + }); + + // create balanced groups + List> fileGroups = new LinkedList>(); + long[] sizeGroups = new long[ngroups]; + int hi = files.size() - 1; + int lo = 0; + + List group; + int dir = 1; + int g = 0; + + while (hi >= lo) { + if (g == fileGroups.size()) { + group = new LinkedList(); + fileGroups.add(group); + } else { + group = fileGroups.get(g); + } + + Pair fileInfo = files.get(hi--); + + // add the hi one + sizeGroups[g] += fileInfo.getSecond(); + group.add(fileInfo.getFirst()); + + // change direction when at the end or the beginning + g += dir; + if (g == ngroups) { + dir = -1; + g = ngroups - 1; + } else if (g < 0) { + dir = 1; + g = 0; + } + } + + if (LOG.isDebugEnabled()) { + for (int i = 0; i < sizeGroups.length; ++i) { + LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); + } + } + + return fileGroups; + } + + private static Path getInputFolderPath(Configuration conf) + throws IOException, InterruptedException { + Path stagingDir = JobUtil.getStagingDir(conf); + return new Path(stagingDir, INPUT_FOLDER_PREFIX + + String.valueOf(EnvironmentEdgeManager.currentTimeMillis())); + } + + /** + * Create the input files, with the path to copy, for the MR job. + * Each input files contains n files, and each input file has a similar amount data to copy. + * The number of input files created are based on the number of mappers provided as argument + * and the number of the files to copy. + */ + private static Path[] createInputFiles(final Configuration conf, + final List> snapshotFiles, int mappers) + throws IOException, InterruptedException { + Path inputFolderPath = getInputFolderPath(conf); + FileSystem fs = inputFolderPath.getFileSystem(conf); + LOG.debug("Input folder location: " + inputFolderPath); + + List> splits = getBalancedSplits(snapshotFiles, mappers); + Path[] inputFiles = new Path[splits.size()]; + + Text key = new Text(); + for (int i = 0; i < inputFiles.length; i++) { + List files = splits.get(i); + inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i)); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i], + Text.class, NullWritable.class); + LOG.debug("Input split: " + i); + try { + for (Path file: files) { + LOG.debug(file.toString()); + key.set(file.toString()); + writer.append(key, NullWritable.get()); + } + } finally { + writer.close(); + } + } + + return inputFiles; + } + + /** + * Run Map-Reduce Job to perform the files copy. + */ + private boolean runCopyJob(final Path inputRoot, final Path outputRoot, + final List> snapshotFiles, final boolean verifyChecksum, + final String filesUser, final String filesGroup, final int filesMode, + final int mappers) throws IOException, InterruptedException, ClassNotFoundException { + Configuration conf = getConf(); + if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); + if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); + conf.setInt(CONF_FILES_MODE, filesMode); + conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); + conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); + conf.set(CONF_INPUT_ROOT, inputRoot.toString()); + conf.setInt("mapreduce.job.maps", mappers); + + Job job = new Job(conf); + job.setJobName("ExportSnapshot"); + job.setJarByClass(ExportSnapshot.class); + job.setMapperClass(ExportMapper.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setMapSpeculativeExecution(false); + job.setNumReduceTasks(0); + for (Path path: createInputFiles(conf, snapshotFiles, mappers)) { + LOG.debug("Add Input Path=" + path); + SequenceFileInputFormat.addInputPath(job, path); + } + + return job.waitForCompletion(true); + } + + /** + * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs. + * @return 0 on success, and != 0 upon failure. + */ + @Override + public int run(String[] args) throws Exception { + boolean verifyChecksum = true; + String snapshotName = null; + String filesGroup = null; + String filesUser = null; + Path outputRoot = null; + int filesMode = 0; + int mappers = getConf().getInt("mapreduce.job.maps", 1); + + // Process command line args + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + try { + if (cmd.equals("-snapshot")) { + snapshotName = args[++i]; + } else if (cmd.equals("-copy-to")) { + outputRoot = new Path(args[++i]); + } else if (cmd.equals("-no-checksum-verify")) { + verifyChecksum = false; + } else if (cmd.equals("-mappers")) { + mappers = Integer.parseInt(args[++i]); + } else if (cmd.equals("-chuser")) { + filesUser = args[++i]; + } else if (cmd.equals("-chgroup")) { + filesGroup = args[++i]; + } else if (cmd.equals("-chmod")) { + filesMode = Integer.parseInt(args[++i], 8); + } else if (cmd.equals("-h") || cmd.equals("--help")) { + printUsageAndExit(); + } else { + System.err.println("UNEXPECTED: " + cmd); + printUsageAndExit(); + } + } catch (Exception e) { + printUsageAndExit(); + } + } + + // Check user options + if (snapshotName == null) { + System.err.println("Snapshot name not provided."); + printUsageAndExit(); + } + + if (outputRoot == null) { + System.err.println("Destination file-system not provided."); + printUsageAndExit(); + } + + Configuration conf = getConf(); + Path inputRoot = FSUtils.getRootDir(conf); + FileSystem inputFs = FileSystem.get(conf); + FileSystem outputFs = FileSystem.get(outputRoot.toUri(), new Configuration()); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); + Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotName, outputRoot); + Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, outputRoot); + + // Check if the snapshot already exists + if (outputFs.exists(outputSnapshotDir)) { + System.err.println("The snapshot '" + snapshotName + + "' already exists in the destination: " + outputSnapshotDir); + return 1; + } + + // Check if the snapshot already in-progress + if (outputFs.exists(snapshotTmpDir)) { + System.err.println("A snapshot with the same name '" + snapshotName + "' is in-progress"); + return 1; + } + + // Step 0 - Extract snapshot files to copy + final List> files = getSnapshotFiles(inputFs, snapshotDir); + + // Step 1 - Copy fs1:/.snapshot/ to fs2:/.snapshot/.tmp/ + // The snapshot references must be copied before the hfiles otherwise the cleaner + // will remove them because they are unreferenced. + try { + FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf); + } catch (IOException e) { + System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir + + " to=" + snapshotTmpDir); + e.printStackTrace(System.err); + return 1; + } + + // Step 2 - Start MR Job to copy files + // The snapshot references must be copied before the files otherwise the files gets removed + // by the HFileArchiver, since they have no references. + try { + if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum, + filesUser, filesGroup, filesMode, mappers)) { + throw new ExportSnapshotException("Snapshot export failed!"); + } + + // Step 3 - Rename fs2:/.snapshot/.tmp/ fs2:/.snapshot/ + if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { + System.err.println("Snapshot export failed!"); + System.err.println("Unable to rename snapshot directory from=" + + snapshotTmpDir + " to=" + outputSnapshotDir); + return 1; + } + + return 0; + } catch (Exception e) { + System.err.println("Snapshot export failed!"); + e.printStackTrace(System.err); + outputFs.delete(outputSnapshotDir, true); + return 1; + } + } + + // ExportSnapshot + private void printUsageAndExit() { + System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName()); + System.err.println(" where [options] are:"); + System.err.println(" -h|-help Show this help and exit."); + System.err.println(" -snapshot NAME Snapshot to restore."); + System.err.println(" -copy-to NAME Remote destination hdfs://"); + System.err.println(" -no-checksum-verify Do not verify checksum."); + System.err.println(" -chuser USERNAME Change the owner of the files to the specified one."); + System.err.println(" -chgroup GROUP Change the group of the files to the specified one."); + System.err.println(" -chmod MODE Change the permission of the files to the specified one."); + System.err.println(" -mappers Number of mappers to use during the copy (mapreduce.job.maps)."); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" hbase " + getClass() + " \\"); + System.err.println(" -snapshot MySnapshot -copy-to hdfs:///srv2:8082/hbase \\"); + System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16"); + System.exit(1); + } + + /** + * The guts of the {@link #main} method. + * Call this method to avoid the {@link #main(String[])} System.exit. + * @param args + * @return errCode + * @throws Exception + */ + static int innerMain(final Configuration conf, final String [] args) throws Exception { + return ToolRunner.run(conf, new ExportSnapshot(), args); + } + + public static void main(String[] args) throws Exception { + System.exit(innerMain(HBaseConfiguration.create(), args)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java new file mode 100644 index 00000000000..c0610582ae5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.snapshot.tool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test Export Snapshot Tool + */ +@Category(MediumTests.class) +public class TestExportSnapshot { + private final Log LOG = LogFactory.getLog(getClass()); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final static byte[] FAMILY = Bytes.toBytes("cf"); + + private byte[] snapshotName; + private byte[] tableName; + private HBaseAdmin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Create a table and take a snapshot of the table used by the export test. + */ + @Before + public void setUp() throws Exception { + this.admin = TEST_UTIL.getHBaseAdmin(); + + long tid = System.currentTimeMillis(); + tableName = Bytes.toBytes("testtb-" + tid); + snapshotName = Bytes.toBytes("snaptb0-" + tid); + + // create Table + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(htd, null); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + TEST_UTIL.loadTable(table, FAMILY); + + // take a snapshot + admin.disableTable(tableName); + admin.snapshot(snapshotName, tableName); + admin.enableTable(tableName); + } + + @After + public void tearDown() throws Exception { + this.admin.close(); + } + + /** + * Verfy the result of getBalanceSplits() method. + * The result are groups of files, used as input list for the "export" mappers. + * All the groups should have similar amount of data. + * + * The input list is a pair of file path and length. + * The getBalanceSplits() function sort it by length, + * and assign to each group a file, going back and forth through the groups. + */ + @Test + public void testBalanceSplit() throws Exception { + // Create a list of files + List> files = new ArrayList>(); + for (long i = 0; i <= 20; i++) { + files.add(new Pair(new Path("file-" + i), i)); + } + + // Create 5 groups (total size 210) + // group 0: 20, 11, 10, 1 (total size: 42) + // group 1: 19, 12, 9, 2 (total size: 42) + // group 2: 18, 13, 8, 3 (total size: 42) + // group 3: 17, 12, 7, 4 (total size: 42) + // group 4: 16, 11, 6, 5 (total size: 42) + List> splits = ExportSnapshot.getBalancedSplits(files, 5); + assertEquals(5, splits.size()); + assertEquals(Arrays.asList(new Path("file-20"), new Path("file-11"), + new Path("file-10"), new Path("file-1"), new Path("file-0")), splits.get(0)); + assertEquals(Arrays.asList(new Path("file-19"), new Path("file-12"), + new Path("file-9"), new Path("file-2")), splits.get(1)); + assertEquals(Arrays.asList(new Path("file-18"), new Path("file-13"), + new Path("file-8"), new Path("file-3")), splits.get(2)); + assertEquals(Arrays.asList(new Path("file-17"), new Path("file-14"), + new Path("file-7"), new Path("file-4")), splits.get(3)); + assertEquals(Arrays.asList(new Path("file-16"), new Path("file-15"), + new Path("file-6"), new Path("file-5")), splits.get(4)); + } + + /** + * Verify if exported snapshot and copied files matches the original one. + */ + @Test + public void testExportFileSystemState() throws Exception { + Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis()); + URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri(); + FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration()); + copyDir = copyDir.makeQualified(fs); + + // Export Snapshot + int res = ExportSnapshot.innerMain(TEST_UTIL.getConfiguration(), new String[] { + "-snapshot", Bytes.toString(snapshotName), + "-copy-to", copyDir.toString() + }); + assertEquals(0, res); + + // Verify File-System state + FileStatus[] rootFiles = fs.listStatus(copyDir); + assertEquals(2, rootFiles.length); + for (FileStatus fileStatus: rootFiles) { + String name = fileStatus.getPath().getName(); + assertTrue(fileStatus.isDir()); + assertTrue(name.equals(".snapshot") || name.equals(".archive")); + } + + // compare the snapshot metadata and verify the hfiles + final FileSystem hdfs = FileSystem.get(hdfsUri, TEST_UTIL.getConfiguration()); + final Path snapshotDir = new Path(".snapshot", Bytes.toString(snapshotName)); + verifySnapshot(hdfs, new Path(TEST_UTIL.getDefaultRootDirPath(), snapshotDir), + fs, new Path(copyDir, snapshotDir)); + verifyArchive(fs, copyDir, Bytes.toString(snapshotName)); + + // Remove the exported dir + fs.delete(copyDir, true); + } + + /* + * verify if the snapshot folder on file-system 1 match the one on file-system 2 + */ + private void verifySnapshot(final FileSystem fs1, final Path root1, + final FileSystem fs2, final Path root2) throws IOException { + Set s = new HashSet(); + assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2)); + } + + /* + * Verify if the files exists + */ + private void verifyArchive(final FileSystem fs, final Path rootDir, final String snapshotName) + throws IOException { + final Path exportedSnapshot = new Path(rootDir, new Path(".snapshot", snapshotName)); + final Path exportedArchive = new Path(rootDir, ".archive"); + LOG.debug(listFiles(fs, exportedArchive, exportedArchive)); + SnapshotReferenceUtil.visitReferencedFiles(fs, exportedSnapshot, + new SnapshotReferenceUtil.FileVisitor() { + public void storeFile (final String region, final String family, final String hfile) + throws IOException { + verifyNonEmptyFile(new Path(exportedArchive, + new Path(Bytes.toString(tableName), new Path(region, new Path(family, hfile))))); + } + + public void recoveredEdits (final String region, final String logfile) + throws IOException { + verifyNonEmptyFile(new Path(exportedSnapshot, + new Path(Bytes.toString(tableName), new Path(region, logfile)))); + } + + public void logFile (final String server, final String logfile) + throws IOException { + verifyNonEmptyFile(new Path(exportedSnapshot, new Path(server, logfile))); + } + + private void verifyNonEmptyFile(final Path path) throws IOException { + LOG.debug(path); + assertTrue(fs.exists(path)); + assertTrue(fs.getFileStatus(path).getLen() > 0); + } + }); + } + + private Set listFiles(final FileSystem fs, final Path root, final Path dir) + throws IOException { + Set files = new HashSet(); + int rootPrefix = root.toString().length(); + FileStatus[] list = FSUtils.listStatus(fs, dir); + if (list != null) { + for (FileStatus fstat: list) { + LOG.debug(fstat.getPath()); + if (fstat.isDir()) { + files.addAll(listFiles(fs, root, fstat.getPath())); + } else { + files.add(fstat.getPath().toString().substring(rootPrefix)); + } + } + } + return files; + } +} +