diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 2f6600619d2..05e03289ac8 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -169,6 +169,8 @@ Release 2.4.0 - UNRELEASED
HADOOP-10135 writes to swift fs over partition size leave temp files and
empty output file (David Dobbins via stevel)
+ HADOOP-10129. Distcp may succeed when it fails (daryn)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -2090,6 +2092,8 @@ Release 0.23.10 - UNRELEASED
BUG FIXES
+ HADOOP-10129. Distcp may succeed when it fails (daryn)
+
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES
diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml
index c29de998a68..c84cac0529b 100644
--- a/hadoop-tools/hadoop-distcp/pom.xml
+++ b/hadoop-tools/hadoop-distcp/pom.xml
@@ -95,6 +95,11 @@
test
test-jar
+
+ org.mockito
+ mockito-all
+ test
+
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index 53c55b78421..abd184517c9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.*;
import java.util.Stack;
@@ -107,12 +109,13 @@ public class SimpleCopyListing extends CopyListing {
/** {@inheritDoc} */
@Override
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
-
- SequenceFile.Writer fileListWriter = null;
-
+ doBuildListing(getWriter(pathToListingFile), options);
+ }
+
+ @VisibleForTesting
+ public void doBuildListing(SequenceFile.Writer fileListWriter,
+ DistCpOptions options) throws IOException {
try {
- fileListWriter = getWriter(pathToListingFile);
-
for (Path path: options.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path);
@@ -140,8 +143,10 @@ public class SimpleCopyListing extends CopyListing {
writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
}
}
+ fileListWriter.close();
+ fileListWriter = null;
} finally {
- IOUtils.closeStream(fileListWriter);
+ IOUtils.cleanup(LOG, fileListWriter);
}
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 87fb2d4511e..580229cf8e8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.*;
import java.util.EnumSet;
@@ -176,7 +178,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
- private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
+ @VisibleForTesting
+ long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
int bufferSize, Mapper.Context context)
throws IOException {
Path source = sourceFileStatus.getPath();
@@ -193,6 +196,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
updateContextStatus(totalBytesRead, context, sourceFileStatus);
bytesRead = inStream.read(buf);
}
+ outStream.close();
+ outStream = null;
} finally {
IOUtils.cleanup(LOG, outStream, inStream);
}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
index a0dfec82204..a932771baf7 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.tools;
+import static org.mockito.Mockito.*;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -35,6 +37,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.AfterClass;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@@ -249,4 +252,29 @@ public class TestCopyListing extends SimpleCopyListing {
IOUtils.closeStream(reader);
}
}
+
+ @Test
+ public void testFailOnCloseError() throws IOException {
+ File inFile = File.createTempFile("TestCopyListingIn", null);
+ inFile.deleteOnExit();
+ File outFile = File.createTempFile("TestCopyListingOut", null);
+ outFile.deleteOnExit();
+ List srcs = new ArrayList();
+ srcs.add(new Path(inFile.toURI()));
+
+ Exception expectedEx = new IOException("boom");
+ SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
+ doThrow(expectedEx).when(writer).close();
+
+ SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+ DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
+ Exception actualEx = null;
+ try {
+ listing.doBuildListing(writer, options);
+ } catch (Exception e) {
+ actualEx = e;
+ }
+ Assert.assertNotNull("close writer didn't fail", actualEx);
+ Assert.assertEquals(expectedEx, actualEx);
+ }
}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
new file mode 100644
index 00000000000..c5ec513bec5
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestRetriableFileCopyCommand {
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testFailOnCloseError() throws Exception {
+ Mapper.Context context = mock(Mapper.Context.class);
+ doReturn(new Configuration()).when(context).getConfiguration();
+
+ Exception expectedEx = new IOException("boom");
+ OutputStream out = mock(OutputStream.class);
+ doThrow(expectedEx).when(out).close();
+
+ File f = File.createTempFile(this.getClass().getSimpleName(), null);
+ f.deleteOnExit();
+ FileStatus stat =
+ new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()));
+
+ Exception actualEx = null;
+ try {
+ new RetriableFileCopyCommand("testFailOnCloseError")
+ .copyBytes(stat, out, 512, context);
+ } catch (Exception e) {
+ actualEx = e;
+ }
+ assertNotNull("close didn't fail", actualEx);
+ assertEquals(expectedEx, actualEx);
+ }
+}