HBASE-10537 Let the ExportSnapshot mapper fail and retry on error
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1569680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2514f24468
commit
96a5ef9131
|
@ -18,12 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -81,6 +83,9 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
|
||||
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
|
||||
|
||||
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
|
||||
static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
|
||||
|
||||
private static final String INPUT_FOLDER_PREFIX = "export-files.";
|
||||
|
||||
// Export Map-Reduce Counters, to keep track of the progress
|
||||
|
@ -90,6 +95,9 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
final static int REPORT_SIZE = 1 * 1024 * 1024;
|
||||
final static int BUFFER_SIZE = 64 * 1024;
|
||||
|
||||
private boolean testFailures;
|
||||
private Random random;
|
||||
|
||||
private boolean verifyChecksum;
|
||||
private String filesGroup;
|
||||
private String filesUser;
|
||||
|
@ -117,6 +125,8 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||
outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||
|
||||
testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
|
||||
|
||||
try {
|
||||
inputFs = FileSystem.get(inputRoot.toUri(), conf);
|
||||
} catch (IOException e) {
|
||||
|
@ -137,9 +147,8 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
Path outputPath = getOutputPath(inputPath);
|
||||
|
||||
LOG.info("copy file input=" + inputPath + " output=" + outputPath);
|
||||
if (copyFile(context, inputPath, outputPath)) {
|
||||
LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
|
||||
}
|
||||
copyFile(context, inputPath, outputPath);
|
||||
LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -166,25 +175,55 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
return new Path(outputArchive, path);
|
||||
}
|
||||
|
||||
private boolean copyFile(final Context context, final Path inputPath, final Path outputPath)
|
||||
/*
|
||||
* Used by TestExportSnapshot to simulate a failure
|
||||
*/
|
||||
private void injectTestFailure(final Context context, final Path inputPath)
|
||||
throws IOException {
|
||||
if (testFailures) {
|
||||
if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
|
||||
if (random == null) {
|
||||
random = new Random();
|
||||
}
|
||||
|
||||
// FLAKY-TEST-WARN: lower is better, we can get some runs without the
|
||||
// retry, but at least we reduce the number of test failures due to
|
||||
// this test exception from the same map task.
|
||||
if (random.nextFloat() < 0.05) {
|
||||
throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputPath
|
||||
+ " time=" + System.currentTimeMillis());
|
||||
}
|
||||
} else {
|
||||
context.getCounter(Counter.COPY_FAILED).increment(1);
|
||||
throw new IOException("TEST FAILURE: Unable to copy input=" + inputPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void copyFile(final Context context, final Path inputPath, final Path outputPath)
|
||||
throws IOException {
|
||||
injectTestFailure(context, inputPath);
|
||||
|
||||
FSDataInputStream in = openSourceFile(inputPath);
|
||||
if (in == null) {
|
||||
context.getCounter(Counter.MISSING_FILES).increment(1);
|
||||
return false;
|
||||
throw new FileNotFoundException(inputPath.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
// Verify if the input file exists
|
||||
FileStatus inputStat = getFileStatus(inputFs, inputPath);
|
||||
if (inputStat == null) return false;
|
||||
if (inputStat == null) {
|
||||
context.getCounter(Counter.MISSING_FILES).increment(1);
|
||||
throw new FileNotFoundException(inputPath.toString());
|
||||
}
|
||||
|
||||
// Verify if the output file exists and is the same that we want to copy
|
||||
if (outputFs.exists(outputPath)) {
|
||||
FileStatus outputStat = outputFs.getFileStatus(outputPath);
|
||||
if (sameFile(inputStat, outputStat)) {
|
||||
LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -194,14 +233,13 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
outputFs.mkdirs(outputPath.getParent());
|
||||
FSDataOutputStream out = outputFs.create(outputPath, true);
|
||||
try {
|
||||
if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen()))
|
||||
return false;
|
||||
copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
|
||||
// Preserve attributes
|
||||
return preserveAttributes(outputPath, inputStat);
|
||||
preserveAttributes(outputPath, inputStat);
|
||||
} finally {
|
||||
in.close();
|
||||
}
|
||||
|
@ -244,10 +282,11 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean copyData(final Context context,
|
||||
private void copyData(final Context context,
|
||||
final Path inputPath, final FSDataInputStream in,
|
||||
final Path outputPath, final FSDataOutputStream out,
|
||||
final long inputFileSize) {
|
||||
final long inputFileSize)
|
||||
throws IOException {
|
||||
final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
|
||||
" (%.3f%%)";
|
||||
|
||||
|
@ -280,17 +319,14 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
|
||||
// Verify that the written size match
|
||||
if (totalBytesWritten != inputFileSize) {
|
||||
LOG.error("number of bytes copied not matching copied=" + totalBytesWritten +
|
||||
" expected=" + inputFileSize + " for file=" + inputPath);
|
||||
context.getCounter(Counter.COPY_FAILED).increment(1);
|
||||
return false;
|
||||
String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
|
||||
" expected=" + inputFileSize + " for file=" + inputPath;
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error copying " + inputPath + " to " + outputPath, e);
|
||||
context.getCounter(Counter.COPY_FAILED).increment(1);
|
||||
return false;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,7 +653,7 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
if (outputFs.exists(snapshotTmpDir)) {
|
||||
System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress");
|
||||
System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
|
||||
System.err.println("consider removing " + snapshotTmpDir + " before retrying export");
|
||||
System.err.println("consider removing " + snapshotTmpDir + " before retrying export");
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -656,9 +692,9 @@ public final class ExportSnapshot extends Configured implements Tool {
|
|||
snapshotTmpDir + " to=" + outputSnapshotDir);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Snapshot export failed", e);
|
||||
System.err.println("Snapshot export failed!");
|
||||
e.printStackTrace(System.err);
|
||||
outputFs.delete(outputSnapshotDir, true);
|
||||
|
|
|
@ -105,16 +105,14 @@ public class TestExportSnapshot {
|
|||
emptySnapshotName = Bytes.toBytes("emptySnaptb0-" + tid);
|
||||
|
||||
// create Table
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(FAMILY));
|
||||
admin.createTable(htd, null);
|
||||
SnapshotTestingUtils.createTable(TEST_UTIL, tableName, FAMILY);
|
||||
|
||||
// Take an empty snapshot
|
||||
admin.snapshot(emptySnapshotName, tableName);
|
||||
|
||||
// Add some rows
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
TEST_UTIL.loadTable(table, FAMILY);
|
||||
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 1000, FAMILY);
|
||||
|
||||
// take a snapshot
|
||||
admin.snapshot(snapshotName, tableName);
|
||||
|
@ -127,6 +125,7 @@ public class TestExportSnapshot {
|
|||
SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verfy the result of getBalanceSplits() method.
|
||||
* The result are groups of files, used as input list for the "export" mappers.
|
||||
|
@ -267,6 +266,44 @@ public class TestExportSnapshot {
|
|||
fs.delete(copyDir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that ExportSnapshot will return a failure if something fails.
|
||||
*/
|
||||
@Test
|
||||
public void testExportFailure() throws Exception {
|
||||
assertEquals(1, runExportAndInjectFailures(snapshotName, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that ExportSnapshot will succede if something fails but the retry succede.
|
||||
*/
|
||||
@Test
|
||||
public void testExportRetry() throws Exception {
|
||||
assertEquals(0, runExportAndInjectFailures(snapshotName, true));
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute the ExportSnapshot job injecting failures
|
||||
*/
|
||||
private int runExportAndInjectFailures(final byte[] snapshotName, boolean retry)
|
||||
throws Exception {
|
||||
Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
|
||||
URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri();
|
||||
FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
|
||||
copyDir = copyDir.makeQualified(fs);
|
||||
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setBoolean(ExportSnapshot.CONF_TEST_FAILURE, true);
|
||||
conf.setBoolean(ExportSnapshot.CONF_TEST_RETRY, retry);
|
||||
|
||||
// Export Snapshot
|
||||
int res = ExportSnapshot.innerMain(conf, new String[] {
|
||||
"-snapshot", Bytes.toString(snapshotName),
|
||||
"-copy-to", copyDir.toString()
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* verify if the snapshot folder on file-system 1 match the one on file-system 2
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue