svn merge -c 1548175 FIXES: HADOOP-10129. Distcp may succeed when it fails (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1548176 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-12-05 15:51:28 +00:00
parent c90426e4d7
commit 92fc940ea8
6 changed files with 113 additions and 7 deletions

View File

@ -169,6 +169,8 @@ Release 2.4.0 - UNRELEASED
HADOOP-10135 writes to swift fs over partition size leave temp files and HADOOP-10135 writes to swift fs over partition size leave temp files and
empty output file (David Dobbins via stevel) empty output file (David Dobbins via stevel)
HADOOP-10129. Distcp may succeed when it fails (daryn)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -2090,6 +2092,8 @@ Release 0.23.10 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-10129. Distcp may succeed when it fails (daryn)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -95,6 +95,11 @@
<scope>test</scope> <scope>test</scope>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import com.google.common.annotations.VisibleForTesting;
import java.io.*; import java.io.*;
import java.util.Stack; import java.util.Stack;
@ -107,12 +109,13 @@ public class SimpleCopyListing extends CopyListing {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
doBuildListing(getWriter(pathToListingFile), options);
SequenceFile.Writer fileListWriter = null; }
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException {
try { try {
fileListWriter = getWriter(pathToListingFile);
for (Path path: options.getSourcePaths()) { for (Path path: options.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf()); FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path); path = makeQualified(path);
@ -140,8 +143,10 @@ public class SimpleCopyListing extends CopyListing {
writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile); writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
} }
} }
fileListWriter.close();
fileListWriter = null;
} finally { } finally {
IOUtils.closeStream(fileListWriter); IOUtils.cleanup(LOG, fileListWriter);
} }
} }

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import java.io.*; import java.io.*;
import java.util.EnumSet; import java.util.EnumSet;
@ -176,7 +178,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()); return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
} }
private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, @VisibleForTesting
long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
int bufferSize, Mapper.Context context) int bufferSize, Mapper.Context context)
throws IOException { throws IOException {
Path source = sourceFileStatus.getPath(); Path source = sourceFileStatus.getPath();
@ -193,6 +196,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
updateContextStatus(totalBytesRead, context, sourceFileStatus); updateContextStatus(totalBytesRead, context, sourceFileStatus);
bytesRead = inStream.read(buf); bytesRead = inStream.read(buf);
} }
outStream.close();
outStream = null;
} finally { } finally {
IOUtils.cleanup(LOG, outStream, inStream); IOUtils.cleanup(LOG, outStream, inStream);
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -35,6 +37,7 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
@ -249,4 +252,29 @@ public class TestCopyListing extends SimpleCopyListing {
IOUtils.closeStream(reader); IOUtils.closeStream(reader);
} }
} }
@Test
public void testFailOnCloseError() throws IOException {
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
List<Path> srcs = new ArrayList<Path>();
srcs.add(new Path(inFile.toURI()));
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
Exception actualEx = null;
try {
listing.doBuildListing(writer, options);
} catch (Exception e) {
actualEx = e;
}
Assert.assertNotNull("close writer didn't fail", actualEx);
Assert.assertEquals(expectedEx, actualEx);
}
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
public class TestRetriableFileCopyCommand {
@SuppressWarnings("rawtypes")
@Test
public void testFailOnCloseError() throws Exception {
Mapper.Context context = mock(Mapper.Context.class);
doReturn(new Configuration()).when(context).getConfiguration();
Exception expectedEx = new IOException("boom");
OutputStream out = mock(OutputStream.class);
doThrow(expectedEx).when(out).close();
File f = File.createTempFile(this.getClass().getSimpleName(), null);
f.deleteOnExit();
FileStatus stat =
new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()));
Exception actualEx = null;
try {
new RetriableFileCopyCommand("testFailOnCloseError")
.copyBytes(stat, out, 512, context);
} catch (Exception e) {
actualEx = e;
}
assertNotNull("close didn't fail", actualEx);
assertEquals(expectedEx, actualEx);
}
}