HADOOP-12469. distcp should not ignore the ignoreFailures option. Contributed by Mingliang Liu.
This commit is contained in:
parent
f343d91ecc
commit
af942585a1
|
@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.tools.DistCpConstants;
|
|||
import org.apache.hadoop.tools.DistCpOptionSwitch;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||
import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -251,8 +253,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
|
||||
target, exception);
|
||||
|
||||
if (ignoreFailures && exception.getCause() instanceof
|
||||
RetriableFileCopyCommand.CopyReadException) {
|
||||
if (ignoreFailures &&
|
||||
ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) {
|
||||
incrementCounter(context, Counter.FAIL, 1);
|
||||
incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
|
||||
context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " +
|
||||
|
|
|
@ -392,6 +392,8 @@ public class TestCopyMapper {
|
|||
public void testIgnoreFailures() {
|
||||
doTestIgnoreFailures(true);
|
||||
doTestIgnoreFailures(false);
|
||||
doTestIgnoreFailuresDoubleWrapped(true);
|
||||
doTestIgnoreFailuresDoubleWrapped(false);
|
||||
}
|
||||
|
||||
@Test(timeout=40000)
|
||||
|
@ -800,6 +802,89 @@ public class TestCopyMapper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test covers the case where the CopyReadException is double-wrapped and
|
||||
* the mapper should be able to ignore this nested read exception.
|
||||
* @see #doTestIgnoreFailures
|
||||
*/
|
||||
private void doTestIgnoreFailuresDoubleWrapped(final boolean ignoreFailures) {
|
||||
try {
|
||||
deleteState();
|
||||
createSourceData();
|
||||
|
||||
final UserGroupInformation tmpUser = UserGroupInformation
|
||||
.createRemoteUser("guest");
|
||||
|
||||
final CopyMapper copyMapper = new CopyMapper();
|
||||
|
||||
final Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
tmpUser.doAs(new PrivilegedAction<
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context>() {
|
||||
@Override
|
||||
public Mapper<Text, CopyListingFileStatus, Text, Text>.Context
|
||||
run() {
|
||||
try {
|
||||
StubContext stubContext = new StubContext(
|
||||
getConfiguration(), null, 0);
|
||||
return stubContext.getContext();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered when get stub context", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
touchFile(SOURCE_PATH + "/src/file");
|
||||
mkdirs(TARGET_PATH);
|
||||
cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
|
||||
new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE));
|
||||
cluster.getFileSystem().setPermission(new Path(TARGET_PATH),
|
||||
new FsPermission((short)511));
|
||||
|
||||
context.getConfiguration().setBoolean(
|
||||
DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), ignoreFailures);
|
||||
|
||||
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
|
||||
@Override
|
||||
public FileSystem run() {
|
||||
try {
|
||||
return FileSystem.get(configuration);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception encountered when get FileSystem.", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tmpUser.doAs(new PrivilegedAction<Integer>() {
|
||||
@Override
|
||||
public Integer run() {
|
||||
try {
|
||||
copyMapper.setup(context);
|
||||
copyMapper.map(new Text("/src/file"),
|
||||
new CopyListingFileStatus(tmpFS.getFileStatus(
|
||||
new Path(SOURCE_PATH + "/src/file"))),
|
||||
context);
|
||||
Assert.assertTrue("Should have thrown an IOException if not " +
|
||||
"ignoring failures", ignoreFailures);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unexpected exception encountered. ", e);
|
||||
Assert.assertFalse("Should not have thrown an IOException if " +
|
||||
"ignoring failures", ignoreFailures);
|
||||
// the IOException is not thrown again as it's expected
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered when the mapper copies file.", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unexpected exception encountered. ", e);
|
||||
Assert.fail("Test failed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteState() throws IOException {
|
||||
pathList.clear();
|
||||
nFiles = 0;
|
||||
|
|
Loading…
Reference in New Issue