HBASE-10622 Improve log and Exceptions in Export Snapshot

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1574031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2014-03-04 11:19:17 +00:00
parent 261478a7e4
commit 50f857944e
1 changed files with 114 additions and 74 deletions

View File

@ -84,6 +84,8 @@ public final class ExportSnapshot extends Configured implements Tool {
private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
@ -91,7 +93,7 @@ public final class ExportSnapshot extends Configured implements Tool {
private static final String INPUT_FOLDER_PREFIX = "export-files."; private static final String INPUT_FOLDER_PREFIX = "export-files.";
// Export Map-Reduce Counters, to keep track of the progress // Export Map-Reduce Counters, to keep track of the progress
public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED }; public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED, FILES_COPIED };
private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> { private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
final static int REPORT_SIZE = 1 * 1024 * 1024; final static int REPORT_SIZE = 1 * 1024 * 1024;
@ -104,6 +106,7 @@ public final class ExportSnapshot extends Configured implements Tool {
private String filesGroup; private String filesGroup;
private String filesUser; private String filesUser;
private short filesMode; private short filesMode;
private int bufferSize;
private FileSystem outputFs; private FileSystem outputFs;
private Path outputArchive; private Path outputArchive;
@ -114,7 +117,7 @@ public final class ExportSnapshot extends Configured implements Tool {
private Path inputRoot; private Path inputRoot;
@Override @Override
public void setup(Context context) { public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
@ -132,14 +135,19 @@ public final class ExportSnapshot extends Configured implements Tool {
try { try {
inputFs = FileSystem.get(inputRoot.toUri(), conf); inputFs = FileSystem.get(inputRoot.toUri(), conf);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e); throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
} }
try { try {
outputFs = FileSystem.get(outputRoot.toUri(), conf); outputFs = FileSystem.get(outputRoot.toUri(), conf);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e); throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
} }
// Use the default block size of the outputFs if bigger
int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(), BUFFER_SIZE);
bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
} }
@Override @Override
@ -150,7 +158,6 @@ public final class ExportSnapshot extends Configured implements Tool {
LOG.info("copy file input=" + inputPath + " output=" + outputPath); LOG.info("copy file input=" + inputPath + " output=" + outputPath);
copyFile(context, inputPath, outputPath); copyFile(context, inputPath, outputPath);
LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
} }
/** /**
@ -206,29 +213,20 @@ public final class ExportSnapshot extends Configured implements Tool {
throws IOException { throws IOException {
injectTestFailure(context, inputPath); injectTestFailure(context, inputPath);
FSDataInputStream in = openSourceFile(inputPath); // Get the file information
if (in == null) { FileStatus inputStat = getSourceFileStatus(context, inputPath);
context.getCounter(Counter.MISSING_FILES).increment(1);
throw new FileNotFoundException(inputPath.toString());
}
try {
// Verify if the input file exists
FileStatus inputStat = getFileStatus(inputFs, inputPath);
if (inputStat == null) {
context.getCounter(Counter.MISSING_FILES).increment(1);
throw new FileNotFoundException(inputPath.toString());
}
// Verify if the output file exists and is the same that we want to copy // Verify if the output file exists and is the same that we want to copy
if (outputFs.exists(outputPath)) { if (outputFs.exists(outputPath)) {
FileStatus outputStat = outputFs.getFileStatus(outputPath); FileStatus outputStat = outputFs.getFileStatus(outputPath);
if (sameFile(inputStat, outputStat)) { if (outputStat != null && sameFile(inputStat, outputStat)) {
LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file."); LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
return; return;
} }
} }
FSDataInputStream in = openSourceFile(context, inputPath);
try {
context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
// Ensure that the output folder is there and copy the file // Ensure that the output folder is there and copy the file
@ -240,15 +238,22 @@ public final class ExportSnapshot extends Configured implements Tool {
out.close(); out.close();
} }
// Preserve attributes // Try to Preserve attributes
preserveAttributes(outputPath, inputStat); if (!preserveAttributes(outputPath, inputStat)) {
LOG.warn("You may have to run manually chown on: " + outputPath);
}
} finally { } finally {
in.close(); in.close();
} }
} }
/** /**
* Preserve the files attribute selected by the user copying them from the source file * Try to Preserve the files attribute selected by the user copying them from the source file
* This is only required when you are exporting as a different user than "hbase" or on a system
* that doesn't have the "hbase" user.
*
* This is not considered a blocking failure since the user can force a chmod with the user
* that knows is available on the system.
*/ */
private boolean preserveAttributes(final Path path, final FileStatus refStat) { private boolean preserveAttributes(final Path path, final FileStatus refStat) {
FileStatus stat; FileStatus stat;
@ -266,38 +271,47 @@ public final class ExportSnapshot extends Configured implements Tool {
outputFs.setPermission(path, refStat.getPermission()); outputFs.setPermission(path, refStat.getPermission());
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to set the permission for file=" + path, e); LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
return false; return false;
} }
String user = stringIsNotEmpty(filesUser) ? filesUser : refStat.getOwner();
String group = stringIsNotEmpty(filesGroup) ? filesGroup : refStat.getGroup();
if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
try { try {
String user = (filesUser != null) ? filesUser : refStat.getOwner();
String group = (filesGroup != null) ? filesGroup : refStat.getGroup();
if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
outputFs.setOwner(path, user, group); outputFs.setOwner(path, user, group);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to set the owner/group for file=" + path, e); LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
LOG.warn("The user/group may not exist on the destination cluster: user=" +
user + " group=" + group);
return false; return false;
} }
}
return true; return true;
} }
private boolean stringIsNotEmpty(final String str) {
return str != null && str.length() > 0;
}
private void copyData(final Context context, private void copyData(final Context context,
final Path inputPath, final FSDataInputStream in, final Path inputPath, final FSDataInputStream in,
final Path outputPath, final FSDataOutputStream out, final Path outputPath, final FSDataOutputStream out,
final long inputFileSize) final long inputFileSize)
throws IOException { throws IOException {
final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
" (%.3f%%)"; " (%.1f%%)";
try { try {
byte[] buffer = new byte[BUFFER_SIZE]; byte[] buffer = new byte[bufferSize];
long totalBytesWritten = 0; long totalBytesWritten = 0;
int reportBytes = 0; int reportBytes = 0;
int bytesRead; int bytesRead;
long stime = System.currentTimeMillis();
while ((bytesRead = in.read(buffer)) > 0) { while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead); out.write(buffer, 0, bytesRead);
totalBytesWritten += bytesRead; totalBytesWritten += bytesRead;
@ -307,16 +321,17 @@ public final class ExportSnapshot extends Configured implements Tool {
context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
context.setStatus(String.format(statusMessage, context.setStatus(String.format(statusMessage,
StringUtils.humanReadableInt(totalBytesWritten), StringUtils.humanReadableInt(totalBytesWritten),
totalBytesWritten/(float)inputFileSize) + (totalBytesWritten/(float)inputFileSize) * 100.0f) +
" from " + inputPath + " to " + outputPath); " from " + inputPath + " to " + outputPath);
reportBytes = 0; reportBytes = 0;
} }
} }
long etime = System.currentTimeMillis();
context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
context.setStatus(String.format(statusMessage, context.setStatus(String.format(statusMessage,
StringUtils.humanReadableInt(totalBytesWritten), StringUtils.humanReadableInt(totalBytesWritten),
totalBytesWritten/(float)inputFileSize) + (totalBytesWritten/(float)inputFileSize) * 100.0f) +
" from " + inputPath + " to " + outputPath); " from " + inputPath + " to " + outputPath);
// Verify that the written size match // Verify that the written size match
@ -325,6 +340,13 @@ public final class ExportSnapshot extends Configured implements Tool {
" expected=" + inputFileSize + " for file=" + inputPath; " expected=" + inputFileSize + " for file=" + inputPath;
throw new IOException(msg); throw new IOException(msg);
} }
LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
LOG.info("size=" + totalBytesWritten +
" (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
" time=" + StringUtils.formatTimeDiff(etime, stime) +
String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
context.getCounter(Counter.FILES_COPIED).increment(1);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error copying " + inputPath + " to " + outputPath, e); LOG.error("Error copying " + inputPath + " to " + outputPath, e);
context.getCounter(Counter.COPY_FAILED).increment(1); context.getCounter(Counter.COPY_FAILED).increment(1);
@ -332,7 +354,12 @@ public final class ExportSnapshot extends Configured implements Tool {
} }
} }
private FSDataInputStream openSourceFile(final Path path) { /**
* Try to open the "source" file.
* Throws an IOException if the communication with the inputFs fail or
* if the file is not found.
*/
private FSDataInputStream openSourceFile(Context context, final Path path) throws IOException {
try { try {
if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
return new HFileLink(inputRoot, inputArchive, path).open(inputFs); return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
@ -343,25 +370,30 @@ public final class ExportSnapshot extends Configured implements Tool {
} }
return inputFs.open(path); return inputFs.open(path);
} catch (IOException e) { } catch (IOException e) {
context.getCounter(Counter.MISSING_FILES).increment(1);
LOG.error("Unable to open source file=" + path, e); LOG.error("Unable to open source file=" + path, e);
return null; throw e;
} }
} }
private FileStatus getFileStatus(final FileSystem fs, final Path path) { private FileStatus getSourceFileStatus(Context context, final Path path) throws IOException {
try { try {
if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
HFileLink link = new HFileLink(inputRoot, inputArchive, path); HFileLink link = new HFileLink(inputRoot, inputArchive, path);
return link.getFileStatus(fs); return link.getFileStatus(inputFs);
} else if (isHLogLinkPath(path)) { } else if (isHLogLinkPath(path)) {
String serverName = path.getParent().getName(); String serverName = path.getParent().getName();
String logName = path.getName(); String logName = path.getName();
return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs); return new HLogLink(inputRoot, serverName, logName).getFileStatus(inputFs);
} }
return fs.getFileStatus(path); return inputFs.getFileStatus(path);
} catch (FileNotFoundException e) {
context.getCounter(Counter.MISSING_FILES).increment(1);
LOG.error("Unable to get the status for source file=" + path, e);
throw e;
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to get the status for file=" + path); LOG.error("Unable to get the status for source file=" + path, e);
return null; throw e;
} }
} }
@ -551,7 +583,7 @@ public final class ExportSnapshot extends Configured implements Tool {
/** /**
* Run Map-Reduce Job to perform the files copy. * Run Map-Reduce Job to perform the files copy.
*/ */
private boolean runCopyJob(final FileSystem inputFs, final Path inputRoot, private void runCopyJob(final FileSystem inputFs, final Path inputRoot,
final FileSystem outputFs, final Path outputRoot, final FileSystem outputFs, final Path outputRoot,
final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum, final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode, final String filesUser, final String filesGroup, final int filesMode,
@ -588,7 +620,11 @@ public final class ExportSnapshot extends Configured implements Tool {
outputFsToken.acquireDelegationToken(outputFs); outputFsToken.acquireDelegationToken(outputFs);
// Run the MR Job // Run the MR Job
return job.waitForCompletion(true); if (!job.waitForCompletion(true)) {
// TODO: Replace the fixed string with job.getStatus().getFailureInfo()
// when it will be available on all the supported versions.
throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
}
} finally { } finally {
inputFsToken.releaseDelegationToken(); inputFsToken.releaseDelegationToken();
outputFsToken.releaseDelegationToken(); outputFsToken.releaseDelegationToken();
@ -600,7 +636,7 @@ public final class ExportSnapshot extends Configured implements Tool {
* @return 0 on success, and != 0 upon failure. * @return 0 on success, and != 0 upon failure.
*/ */
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws IOException {
boolean verifyChecksum = true; boolean verifyChecksum = true;
String snapshotName = null; String snapshotName = null;
boolean overwrite = false; boolean overwrite = false;
@ -608,7 +644,7 @@ public final class ExportSnapshot extends Configured implements Tool {
String filesUser = null; String filesUser = null;
Path outputRoot = null; Path outputRoot = null;
int filesMode = 0; int filesMode = 0;
int mappers = getConf().getInt("mapreduce.job.maps", 1); int mappers = 0;
// Process command line args // Process command line args
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
@ -676,26 +712,35 @@ public final class ExportSnapshot extends Configured implements Tool {
} }
// Check if the snapshot already in-progress // Check if the snapshot already in-progress
if (!overwrite && outputFs.exists(snapshotTmpDir)) { if (outputFs.exists(snapshotTmpDir)) {
if (overwrite) {
if (!outputFs.delete(snapshotTmpDir, true)) {
System.err.println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
return 1;
}
} else {
System.err.println("A snapshot with the same name '"+ snapshotName +"' may be in-progress"); System.err.println("A snapshot with the same name '"+ snapshotName +"' may be in-progress");
System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); System.err.println("consider removing " + snapshotTmpDir + " before retrying export");
return 1; return 1;
} }
}
// Step 0 - Extract snapshot files to copy // Step 0 - Extract snapshot files to copy
final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir); final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
if (mappers == 0 && files.size() > 0) {
mappers = 1 + (files.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, files.size());
}
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
// The snapshot references must be copied before the hfiles otherwise the cleaner // The snapshot references must be copied before the hfiles otherwise the cleaner
// will remove them because they are unreferenced. // will remove them because they are unreferenced.
try { try {
FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, overwrite, conf); FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf);
} catch (IOException e) { } catch (IOException e) {
System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir + throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
" to=" + snapshotTmpDir); snapshotDir + " to=" + snapshotTmpDir);
e.printStackTrace(System.err);
return 1;
} }
// Step 2 - Start MR Job to copy files // Step 2 - Start MR Job to copy files
@ -705,24 +750,19 @@ public final class ExportSnapshot extends Configured implements Tool {
if (files.size() == 0) { if (files.size() == 0) {
LOG.warn("There are 0 store file to be copied. There may be no data in the table."); LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
} else { } else {
if (!runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum, runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum,
filesUser, filesGroup, filesMode, mappers)) { filesUser, filesGroup, filesMode, mappers);
throw new ExportSnapshotException("Snapshot export failed!");
}
} }
// Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
System.err.println("Snapshot export failed!"); throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
System.err.println("Unable to rename snapshot directory from=" +
snapshotTmpDir + " to=" + outputSnapshotDir); snapshotTmpDir + " to=" + outputSnapshotDir);
return 1;
} }
return 0; return 0;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Snapshot export failed", e); LOG.error("Snapshot export failed", e);
System.err.println("Snapshot export failed!"); outputFs.delete(snapshotTmpDir, true);
e.printStackTrace(System.err);
outputFs.delete(outputSnapshotDir, true); outputFs.delete(outputSnapshotDir, true);
return 1; return 1;
} }