merge -r 1368001:1368002 from trunk. FIXES: HADOOP-8633

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1368003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-08-01 14:03:01 +00:00
parent af6d62a9c1
commit 34f90dc93b
4 changed files with 245 additions and 21 deletions

View File

@ -692,6 +692,9 @@ Release 0.23.3 - UNRELEASED
org.apache.hadoop.classification.InterfaceAudience not found (Trevor org.apache.hadoop.classification.InterfaceAudience not found (Trevor
Robinson via tgraves) Robinson via tgraves)
HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp
via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
NEW FEATURES NEW FEATURES

View File

@ -24,6 +24,8 @@ import java.io.InputStream;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException; import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException; import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
@ -232,31 +234,65 @@ abstract class CommandWithDestination extends FsCommand {
if (target.exists && (target.stat.isDirectory() || !overwrite)) { if (target.exists && (target.stat.isDirectory() || !overwrite)) {
throw new PathExistsException(target.toString()); throw new PathExistsException(target.toString());
} }
target.fs.setWriteChecksum(writeChecksum); TargetFileSystem targetFs = new TargetFileSystem(target.fs);
PathData tempFile = null;
try { try {
tempFile = target.createTempFile(target+"._COPYING_"); PathData tempTarget = target.suffix("._COPYING_");
FSDataOutputStream out = target.fs.create(tempFile.path, true); targetFs.setWriteChecksum(writeChecksum);
targetFs.writeStreamToFile(in, tempTarget);
targetFs.rename(tempTarget, target);
} finally {
targetFs.close(); // last ditch effort to ensure temp file is removed
}
}
// Helper filter filesystem that registers created files as temp files to
// be deleted on exit unless successfully renamed
private static class TargetFileSystem extends FilterFileSystem {
TargetFileSystem(FileSystem fs) {
super(fs);
}
void writeStreamToFile(InputStream in, PathData target) throws IOException {
FSDataOutputStream out = null;
try {
out = create(target);
IOUtils.copyBytes(in, out, getConf(), true); IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
}
}
// tag created files as temp files
FSDataOutputStream create(PathData item) throws IOException {
try {
return create(item.path, true);
} finally { // might have been created but stream was interrupted
deleteOnExit(item.path);
}
}
void rename(PathData src, PathData target) throws IOException {
// the rename method with an option to delete the target is deprecated // the rename method with an option to delete the target is deprecated
if (target.exists && !target.fs.delete(target.path, false)) { if (target.exists && !delete(target.path, false)) {
// too bad we don't know why it failed // too bad we don't know why it failed
PathIOException e = new PathIOException(target.toString()); PathIOException e = new PathIOException(target.toString());
e.setOperation("delete"); e.setOperation("delete");
throw e; throw e;
} }
if (!tempFile.fs.rename(tempFile.path, target.path)) { if (!rename(src.path, target.path)) {
// too bad we don't know why it failed // too bad we don't know why it failed
PathIOException e = new PathIOException(tempFile.toString()); PathIOException e = new PathIOException(src.toString());
e.setOperation("rename"); e.setOperation("rename");
e.setTargetPath(target.toString()); e.setTargetPath(target.toString());
throw e; throw e;
} }
tempFile = null; // cancel delete on exit if rename is successful
} finally { cancelDeleteOnExit(src.path);
if (tempFile != null) {
tempFile.fs.delete(tempFile.path, false);
} }
@Override
public void close() {
// purge any remaining temp files, but don't close underlying fs
processDeleteOnExit();
} }
} }
} }

View File

@ -60,7 +60,7 @@ public class PathData implements Comparable<PathData> {
* @throws IOException if anything goes wrong... * @throws IOException if anything goes wrong...
*/ */
public PathData(String pathString, Configuration conf) throws IOException { public PathData(String pathString, Configuration conf) throws IOException {
this(FileSystem.get(URI.create(pathString), conf), pathString); this(FileSystem.get(stringToUri(pathString), conf), pathString);
} }
/** /**
@ -170,16 +170,13 @@ public class PathData implements Comparable<PathData> {
} }
/** /**
* Returns a temporary file for this PathData with the given extension. * Returns a new PathData with the given extension.
* The file will be deleted on exit. * @param extension for the suffix
* @param extension for the temporary file
* @return PathData * @return PathData
* @throws IOException shouldn't happen * @throws IOException shouldn't happen
*/ */
public PathData createTempFile(String extension) throws IOException { public PathData suffix(String extension) throws IOException {
PathData tmpFile = new PathData(fs, uri+"._COPYING_"); return new PathData(fs, this+extension);
fs.deleteOnExit(tmpFile.path);
return tmpFile;
} }
/** /**

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.shell;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.shell.CopyCommands.Put;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.stubbing.OngoingStubbing;
public class TestCopy {
static Configuration conf;
static Path path = new Path("mockfs:/file");
static Path tmpPath = new Path("mockfs:/file._COPYING_");
static Put cmd;
static FileSystem mockFs;
static PathData target;
static FileStatus fileStat;
@BeforeClass
public static void setup() throws IOException {
conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
mockFs = mock(FileSystem.class);
fileStat = mock(FileStatus.class);
when(fileStat.isDirectory()).thenReturn(false);
}
@Before
public void resetMock() throws IOException {
reset(mockFs);
target = new PathData(path.toString(), conf);
cmd = new CopyCommands.Put();
cmd.setConf(conf);
}
@Test
public void testCopyStreamTarget() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testCopyStreamTargetExists() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
target.refreshStatus(); // so it's updated as existing
cmd.setOverwrite(true);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, true);
verify(mockFs).delete(eq(path), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCreate() throws Exception {
whenFsCreate().thenThrow(new InterruptedIOException());
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSDataInputStream in = mock(FSDataInputStream.class);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedCopyBytes() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
FSInputStream in = mock(FSInputStream.class);
// make IOUtils.copyBytes fail
when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
new InterruptedIOException());
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
@Test
public void testInterruptedRename() throws Exception {
FSDataOutputStream out = mock(FSDataOutputStream.class);
whenFsCreate().thenReturn(out);
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
new InterruptedIOException());
FSInputStream in = mock(FSInputStream.class);
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
tryCopyStream(in, false);
verify(mockFs).delete(eq(tmpPath), anyBoolean());
verify(mockFs).rename(eq(tmpPath), eq(path));
verify(mockFs, never()).delete(eq(path), anyBoolean());
verify(mockFs, never()).close();
}
private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws IOException {
return when(mockFs.create(eq(tmpPath), any(FsPermission.class),
anyBoolean(), anyInt(), anyShort(), anyLong(),
any(Progressable.class)));
}
private void tryCopyStream(InputStream in, boolean shouldPass) {
try {
cmd.copyStreamToTarget(new FSDataInputStream(in), target);
} catch (InterruptedIOException e) {
assertFalse("copy failed", shouldPass);
} catch (Throwable e) {
assertFalse(e.getMessage(), shouldPass);
}
}
static class MockFileSystem extends FilterFileSystem {
Configuration conf;
MockFileSystem() {
super(mockFs);
}
@Override
public void initialize(URI uri, Configuration conf) {
this.conf = conf;
}
@Override
public Path makeQualified(Path path) {
return path;
}
@Override
public Configuration getConf() {
return conf;
}
}
}