From 50f857944e3cc9d650d5fbe7619b26292646cc1e Mon Sep 17 00:00:00 2001 From: mbertozzi Date: Tue, 4 Mar 2014 11:19:17 +0000 Subject: [PATCH] HBASE-10622 Improve log and Exceptions in Export Snapshot git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1574031 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/snapshot/ExportSnapshot.java | 188 +++++++++++------- 1 file changed, 114 insertions(+), 74 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index fe5313a28d4..388dfb18699 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -84,6 +84,8 @@ public final class ExportSnapshot extends Configured implements Tool { private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; + private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; + private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; @@ -91,7 +93,7 @@ public final class ExportSnapshot extends Configured implements Tool { private static final String INPUT_FOLDER_PREFIX = "export-files."; // Export Map-Reduce Counters, to keep track of the progress - public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED }; + public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED, FILES_COPIED }; private static class ExportMapper extends Mapper { final static int REPORT_SIZE = 1 * 1024 * 1024; @@ -104,6 +106,7 @@ public final class ExportSnapshot extends Configured implements Tool { private String filesGroup; private String filesUser; private short filesMode; + private int bufferSize; private FileSystem outputFs; private Path outputArchive; @@ -114,7 +117,7 @@ public final class ExportSnapshot extends Configured implements Tool { private Path inputRoot; @Override - public void setup(Context context) { + public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); @@ -132,14 +135,19 @@ public final class ExportSnapshot extends Configured implements Tool { try { inputFs = FileSystem.get(inputRoot.toUri(), conf); } catch (IOException e) { - throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e); + throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); } try { outputFs = FileSystem.get(outputRoot.toUri(), conf); } catch (IOException e) { - throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e); + throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); } + + // Use the default block size of the outputFs if bigger + int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(), BUFFER_SIZE); + bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); + LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); } @Override @@ -150,7 +158,6 @@ public final class ExportSnapshot extends Configured implements Tool { LOG.info("copy file input=" + inputPath + " output=" + outputPath); copyFile(context, inputPath, outputPath); - LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); } /** @@ -206,29 +213,20 @@ public final class ExportSnapshot extends Configured implements Tool { throws IOException { injectTestFailure(context, inputPath); - FSDataInputStream in = openSourceFile(inputPath); - if (in == null) { - context.getCounter(Counter.MISSING_FILES).increment(1); - throw new FileNotFoundException(inputPath.toString()); + // Get the file information + FileStatus inputStat = getSourceFileStatus(context, inputPath); + + // Verify if the output file exists and is the same that we want to copy + if (outputFs.exists(outputPath)) { + FileStatus outputStat = outputFs.getFileStatus(outputPath); + if (outputStat != null && sameFile(inputStat, outputStat)) { + LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file."); + return; + } } + FSDataInputStream in = openSourceFile(context, inputPath); try { - // Verify if the input file exists - FileStatus inputStat = getFileStatus(inputFs, inputPath); - if (inputStat == null) { - context.getCounter(Counter.MISSING_FILES).increment(1); - throw new FileNotFoundException(inputPath.toString()); - } - - // Verify if the output file exists and is the same that we want to copy - if (outputFs.exists(outputPath)) { - FileStatus outputStat = outputFs.getFileStatus(outputPath); - if (sameFile(inputStat, outputStat)) { - LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file."); - return; - } - } - context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); // Ensure that the output folder is there and copy the file @@ -240,15 +238,22 @@ public final class ExportSnapshot extends Configured implements Tool { out.close(); } - // Preserve attributes - preserveAttributes(outputPath, inputStat); + // Try to Preserve attributes + if (!preserveAttributes(outputPath, inputStat)) { + LOG.warn("You may have to run manually chown on: " + outputPath); + } } finally { in.close(); } } /** - * Preserve the files attribute selected by the user copying them from the source file + * Try to Preserve the files attribute selected by the user copying them from the source file + * This is only required when you are exporting as a different user than "hbase" or on a system + * that doesn't have the "hbase" user. + * + * This is not considered a blocking failure since the user can force a chmod with the user + * that knows is available on the system. */ private boolean preserveAttributes(final Path path, final FileStatus refStat) { FileStatus stat; @@ -266,38 +271,47 @@ public final class ExportSnapshot extends Configured implements Tool { outputFs.setPermission(path, refStat.getPermission()); } } catch (IOException e) { - LOG.error("Unable to set the permission for file=" + path, e); + LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); return false; } - try { - String user = (filesUser != null) ? filesUser : refStat.getOwner(); - String group = (filesGroup != null) ? filesGroup : refStat.getGroup(); - if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { - outputFs.setOwner(path, user, group); + String user = stringIsNotEmpty(filesUser) ? filesUser : refStat.getOwner(); + String group = stringIsNotEmpty(filesGroup) ? filesGroup : refStat.getGroup(); + if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { + try { + if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { + outputFs.setOwner(path, user, group); + } + } catch (IOException e) { + LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); + LOG.warn("The user/group may not exist on the destination cluster: user=" + + user + " group=" + group); + return false; } - } catch (IOException e) { - LOG.error("Unable to set the owner/group for file=" + path, e); - return false; } return true; } + private boolean stringIsNotEmpty(final String str) { + return str != null && str.length() > 0; + } + private void copyData(final Context context, final Path inputPath, final FSDataInputStream in, final Path outputPath, final FSDataOutputStream out, final long inputFileSize) throws IOException { final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + - " (%.3f%%)"; + " (%.1f%%)"; try { - byte[] buffer = new byte[BUFFER_SIZE]; + byte[] buffer = new byte[bufferSize]; long totalBytesWritten = 0; int reportBytes = 0; int bytesRead; + long stime = System.currentTimeMillis(); while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); totalBytesWritten += bytesRead; @@ -307,16 +321,17 @@ public final class ExportSnapshot extends Configured implements Tool { context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), - totalBytesWritten/(float)inputFileSize) + + (totalBytesWritten/(float)inputFileSize) * 100.0f) + " from " + inputPath + " to " + outputPath); reportBytes = 0; } } + long etime = System.currentTimeMillis(); context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), - totalBytesWritten/(float)inputFileSize) + + (totalBytesWritten/(float)inputFileSize) * 100.0f) + " from " + inputPath + " to " + outputPath); // Verify that the written size match @@ -325,6 +340,13 @@ public final class ExportSnapshot extends Configured implements Tool { " expected=" + inputFileSize + " for file=" + inputPath; throw new IOException(msg); } + + LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); + LOG.info("size=" + totalBytesWritten + + " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + + " time=" + StringUtils.formatTimeDiff(etime, stime) + + String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); + context.getCounter(Counter.FILES_COPIED).increment(1); } catch (IOException e) { LOG.error("Error copying " + inputPath + " to " + outputPath, e); context.getCounter(Counter.COPY_FAILED).increment(1); @@ -332,7 +354,12 @@ public final class ExportSnapshot extends Configured implements Tool { } } - private FSDataInputStream openSourceFile(final Path path) { + /** + * Try to open the "source" file. + * Throws an IOException if the communication with the inputFs fail or + * if the file is not found. + */ + private FSDataInputStream openSourceFile(Context context, final Path path) throws IOException { try { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { return new HFileLink(inputRoot, inputArchive, path).open(inputFs); @@ -343,25 +370,30 @@ public final class ExportSnapshot extends Configured implements Tool { } return inputFs.open(path); } catch (IOException e) { + context.getCounter(Counter.MISSING_FILES).increment(1); LOG.error("Unable to open source file=" + path, e); - return null; + throw e; } } - private FileStatus getFileStatus(final FileSystem fs, final Path path) { + private FileStatus getSourceFileStatus(Context context, final Path path) throws IOException { try { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { HFileLink link = new HFileLink(inputRoot, inputArchive, path); - return link.getFileStatus(fs); + return link.getFileStatus(inputFs); } else if (isHLogLinkPath(path)) { String serverName = path.getParent().getName(); String logName = path.getName(); - return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs); + return new HLogLink(inputRoot, serverName, logName).getFileStatus(inputFs); } - return fs.getFileStatus(path); + return inputFs.getFileStatus(path); + } catch (FileNotFoundException e) { + context.getCounter(Counter.MISSING_FILES).increment(1); + LOG.error("Unable to get the status for source file=" + path, e); + throw e; } catch (IOException e) { - LOG.warn("Unable to get the status for file=" + path); - return null; + LOG.error("Unable to get the status for source file=" + path, e); + throw e; } } @@ -551,7 +583,7 @@ public final class ExportSnapshot extends Configured implements Tool { /** * Run Map-Reduce Job to perform the files copy. */ - private boolean runCopyJob(final FileSystem inputFs, final Path inputRoot, + private void runCopyJob(final FileSystem inputFs, final Path inputRoot, final FileSystem outputFs, final Path outputRoot, final List> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, @@ -588,7 +620,11 @@ public final class ExportSnapshot extends Configured implements Tool { outputFsToken.acquireDelegationToken(outputFs); // Run the MR Job - return job.waitForCompletion(true); + if (!job.waitForCompletion(true)) { + // TODO: Replace the fixed string with job.getStatus().getFailureInfo() + // when it will be available on all the supported versions. + throw new ExportSnapshotException("Copy Files Map-Reduce Job failed"); + } } finally { inputFsToken.releaseDelegationToken(); outputFsToken.releaseDelegationToken(); @@ -600,7 +636,7 @@ public final class ExportSnapshot extends Configured implements Tool { * @return 0 on success, and != 0 upon failure. */ @Override - public int run(String[] args) throws Exception { + public int run(String[] args) throws IOException { boolean verifyChecksum = true; String snapshotName = null; boolean overwrite = false; @@ -608,7 +644,7 @@ public final class ExportSnapshot extends Configured implements Tool { String filesUser = null; Path outputRoot = null; int filesMode = 0; - int mappers = getConf().getInt("mapreduce.job.maps", 1); + int mappers = 0; // Process command line args for (int i = 0; i < args.length; i++) { @@ -676,26 +712,35 @@ public final class ExportSnapshot extends Configured implements Tool { } // Check if the snapshot already in-progress - if (!overwrite && outputFs.exists(snapshotTmpDir)) { - System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress"); - System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); - System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); - return 1; + if (outputFs.exists(snapshotTmpDir)) { + if (overwrite) { + if (!outputFs.delete(snapshotTmpDir, true)) { + System.err.println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); + return 1; + } + } else { + System.err.println("A snapshot with the same name '"+ snapshotName +"' may be in-progress"); + System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); + System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); + return 1; + } } // Step 0 - Extract snapshot files to copy final List> files = getSnapshotFiles(inputFs, snapshotDir); + if (mappers == 0 && files.size() > 0) { + mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10)); + mappers = Math.min(mappers, files.size()); + } // Step 1 - Copy fs1:/.snapshot/ to fs2:/.snapshot/.tmp/ // The snapshot references must be copied before the hfiles otherwise the cleaner // will remove them because they are unreferenced. try { - FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, overwrite, conf); + FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf); } catch (IOException e) { - System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir + - " to=" + snapshotTmpDir); - e.printStackTrace(System.err); - return 1; + throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + + snapshotDir + " to=" + snapshotTmpDir); } // Step 2 - Start MR Job to copy files @@ -705,24 +750,19 @@ public final class ExportSnapshot extends Configured implements Tool { if (files.size() == 0) { LOG.warn("There are 0 store file to be copied. There may be no data in the table."); } else { - if (!runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum, - filesUser, filesGroup, filesMode, mappers)) { - throw new ExportSnapshotException("Snapshot export failed!"); - } + runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum, + filesUser, filesGroup, filesMode, mappers); } // Step 3 - Rename fs2:/.snapshot/.tmp/ fs2:/.snapshot/ if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { - System.err.println("Snapshot export failed!"); - System.err.println("Unable to rename snapshot directory from=" + - snapshotTmpDir + " to=" + outputSnapshotDir); - return 1; + throw new ExportSnapshotException("Unable to rename snapshot directory from=" + + snapshotTmpDir + " to=" + outputSnapshotDir); } return 0; } catch (Exception e) { LOG.error("Snapshot export failed", e); - System.err.println("Snapshot export failed!"); - e.printStackTrace(System.err); + outputFs.delete(snapshotTmpDir, true); outputFs.delete(outputSnapshotDir, true); return 1; } @@ -765,6 +805,6 @@ public final class ExportSnapshot extends Configured implements Tool { } public static void main(String[] args) throws Exception { - System.exit(innerMain(HBaseConfiguration.create(), args)); + System.exit(innerMain(HBaseConfiguration.create(), args)); } }