From af942585a108d70e0946f6dd4c465a54d068eabf Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 4 May 2016 10:23:04 -0700 Subject: [PATCH] HADOOP-12469. distcp should not ignore the ignoreFailures option. Contributed by Mingliang Liu. --- .../hadoop/tools/mapred/CopyMapper.java | 6 +- .../hadoop/tools/mapred/TestCopyMapper.java | 85 +++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 09bceadbc59..4db1d4eb464 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.EnumSet; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,7 @@ import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.util.StringUtils; @@ -251,8 +253,8 @@ public class CopyMapper extends Mapper LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + target, exception); - if (ignoreFailures && exception.getCause() instanceof - RetriableFileCopyCommand.CopyReadException) { + if (ignoreFailures && + ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) { incrementCounter(context, Counter.FAIL, 1); incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen()); context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " + diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 4d0752fef16..866ad6e178e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -392,6 +392,8 @@ public class TestCopyMapper { public void testIgnoreFailures() { doTestIgnoreFailures(true); doTestIgnoreFailures(false); + doTestIgnoreFailuresDoubleWrapped(true); + doTestIgnoreFailuresDoubleWrapped(false); } @Test(timeout=40000) @@ -800,6 +802,89 @@ public class TestCopyMapper { } } + /** + * This test covers the case where the CopyReadException is double-wrapped and + * the mapper should be able to ignore this nested read exception. + * @see #doTestIgnoreFailures + */ + private void doTestIgnoreFailuresDoubleWrapped(final boolean ignoreFailures) { + try { + deleteState(); + createSourceData(); + + final UserGroupInformation tmpUser = UserGroupInformation + .createRemoteUser("guest"); + + final CopyMapper copyMapper = new CopyMapper(); + + final Mapper.Context context = + tmpUser.doAs(new PrivilegedAction< + Mapper.Context>() { + @Override + public Mapper.Context + run() { + try { + StubContext stubContext = new StubContext( + getConfiguration(), null, 0); + return stubContext.getContext(); + } catch (Exception e) { + LOG.error("Exception encountered when get stub context", e); + throw new RuntimeException(e); + } + } + }); + + touchFile(SOURCE_PATH + "/src/file"); + mkdirs(TARGET_PATH); + cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"), + new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE)); + cluster.getFileSystem().setPermission(new Path(TARGET_PATH), + new FsPermission((short)511)); + + context.getConfiguration().setBoolean( + DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), ignoreFailures); + + final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction() { + @Override + public FileSystem run() { + try { + return FileSystem.get(configuration); + } catch (IOException e) { + LOG.error("Exception encountered when get FileSystem.", e); + throw new RuntimeException(e); + } + } + }); + + tmpUser.doAs(new PrivilegedAction() { + @Override + public Integer run() { + try { + copyMapper.setup(context); + copyMapper.map(new Text("/src/file"), + new CopyListingFileStatus(tmpFS.getFileStatus( + new Path(SOURCE_PATH + "/src/file"))), + context); + Assert.assertTrue("Should have thrown an IOException if not " + + "ignoring failures", ignoreFailures); + } catch (IOException e) { + LOG.error("Unexpected exception encountered. ", e); + Assert.assertFalse("Should not have thrown an IOException if " + + "ignoring failures", ignoreFailures); + // the IOException is not thrown again as it's expected + } catch (Exception e) { + LOG.error("Exception encountered when the mapper copies file.", e); + throw new RuntimeException(e); + } + return null; + } + }); + } catch (Exception e) { + LOG.error("Unexpected exception encountered. ", e); + Assert.fail("Test failed: " + e.getMessage()); + } + } + private static void deleteState() throws IOException { pathList.clear(); nFiles = 0;