diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e0b18a898f2..0f62dcf8bd9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -692,6 +692,9 @@ Release 0.23.3 - UNRELEASED org.apache.hadoop.classification.InterfaceAudience not found (Trevor Robinson via tgraves) + HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp + via tgraves) + Release 0.23.2 - UNRELEASED NEW FEATURES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java index a18b6e17368..a861840f07c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java @@ -24,6 +24,8 @@ import java.io.InputStream; import java.util.LinkedList; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException; import org.apache.hadoop.fs.shell.PathExceptions.PathIOException; @@ -232,31 +234,65 @@ abstract class CommandWithDestination extends FsCommand { if (target.exists && (target.stat.isDirectory() || !overwrite)) { throw new PathExistsException(target.toString()); } - target.fs.setWriteChecksum(writeChecksum); - PathData tempFile = null; + TargetFileSystem targetFs = new TargetFileSystem(target.fs); try { - tempFile = target.createTempFile(target+"._COPYING_"); - FSDataOutputStream out = target.fs.create(tempFile.path, true); - IOUtils.copyBytes(in, out, getConf(), true); + PathData tempTarget = target.suffix("._COPYING_"); + targetFs.setWriteChecksum(writeChecksum); + targetFs.writeStreamToFile(in, tempTarget); + targetFs.rename(tempTarget, target); + } finally { + targetFs.close(); // last ditch effort to ensure temp file is removed + } + } + + // Helper filter filesystem that registers created files as temp files to + // be deleted on exit unless successfully renamed + private static class TargetFileSystem extends FilterFileSystem { + TargetFileSystem(FileSystem fs) { + super(fs); + } + + void writeStreamToFile(InputStream in, PathData target) throws IOException { + FSDataOutputStream out = null; + try { + out = create(target); + IOUtils.copyBytes(in, out, getConf(), true); + } finally { + IOUtils.closeStream(out); // just in case copyBytes didn't + } + } + + // tag created files as temp files + FSDataOutputStream create(PathData item) throws IOException { + try { + return create(item.path, true); + } finally { // might have been created but stream was interrupted + deleteOnExit(item.path); + } + } + + void rename(PathData src, PathData target) throws IOException { // the rename method with an option to delete the target is deprecated - if (target.exists && !target.fs.delete(target.path, false)) { + if (target.exists && !delete(target.path, false)) { // too bad we don't know why it failed PathIOException e = new PathIOException(target.toString()); e.setOperation("delete"); throw e; } - if (!tempFile.fs.rename(tempFile.path, target.path)) { + if (!rename(src.path, target.path)) { // too bad we don't know why it failed - PathIOException e = new PathIOException(tempFile.toString()); + PathIOException e = new PathIOException(src.toString()); e.setOperation("rename"); e.setTargetPath(target.toString()); throw e; } - tempFile = null; - } finally { - if (tempFile != null) { - tempFile.fs.delete(tempFile.path, false); - } + // cancel delete on exit if rename is successful + cancelDeleteOnExit(src.path); + } + @Override + public void close() { + // purge any remaining temp files, but don't close underlying fs + processDeleteOnExit(); } } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index 1e3dfd2c4a2..b53d2820de4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -60,7 +60,7 @@ public class PathData implements Comparable { * @throws IOException if anything goes wrong... */ public PathData(String pathString, Configuration conf) throws IOException { - this(FileSystem.get(URI.create(pathString), conf), pathString); + this(FileSystem.get(stringToUri(pathString), conf), pathString); } /** @@ -170,16 +170,13 @@ public class PathData implements Comparable { } /** - * Returns a temporary file for this PathData with the given extension. - * The file will be deleted on exit. - * @param extension for the temporary file + * Returns a new PathData with the given extension. + * @param extension for the suffix * @return PathData * @throws IOException shouldn't happen */ - public PathData createTempFile(String extension) throws IOException { - PathData tmpFile = new PathData(fs, uri+"._COPYING_"); - fs.deleteOnExit(tmpFile.path); - return tmpFile; + public PathData suffix(String extension) throws IOException { + return new PathData(fs, this+extension); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java new file mode 100644 index 00000000000..01336288b4f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.shell; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.shell.CopyCommands.Put; +import org.apache.hadoop.util.Progressable; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.stubbing.OngoingStubbing; + +public class TestCopy { + static Configuration conf; + static Path path = new Path("mockfs:/file"); + static Path tmpPath = new Path("mockfs:/file._COPYING_"); + static Put cmd; + static FileSystem mockFs; + static PathData target; + static FileStatus fileStat; + + @BeforeClass + public static void setup() throws IOException { + conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + mockFs = mock(FileSystem.class); + fileStat = mock(FileStatus.class); + when(fileStat.isDirectory()).thenReturn(false); + } + + @Before + public void resetMock() throws IOException { + reset(mockFs); + target = new PathData(path.toString(), conf); + cmd = new CopyCommands.Put(); + cmd.setConf(conf); + } + + @Test + public void testCopyStreamTarget() throws Exception { + FSDataOutputStream out = mock(FSDataOutputStream.class); + whenFsCreate().thenReturn(out); + when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); + when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true); + FSInputStream in = mock(FSInputStream.class); + when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); + + tryCopyStream(in, true); + verify(mockFs, never()).delete(eq(path), anyBoolean()); + verify(mockFs).rename(eq(tmpPath), eq(path)); + verify(mockFs, never()).delete(eq(tmpPath), anyBoolean()); + verify(mockFs, never()).close(); + } + + @Test + public void testCopyStreamTargetExists() throws Exception { + FSDataOutputStream out = mock(FSDataOutputStream.class); + whenFsCreate().thenReturn(out); + when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat); + target.refreshStatus(); // so it's updated as existing + cmd.setOverwrite(true); + when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); + when(mockFs.delete(eq(path), eq(false))).thenReturn(true); + when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true); + FSInputStream in = mock(FSInputStream.class); + when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); + + tryCopyStream(in, true); + verify(mockFs).delete(eq(path), anyBoolean()); + verify(mockFs).rename(eq(tmpPath), eq(path)); + verify(mockFs, never()).delete(eq(tmpPath), anyBoolean()); + verify(mockFs, never()).close(); + } + + @Test + public void testInterruptedCreate() throws Exception { + whenFsCreate().thenThrow(new InterruptedIOException()); + when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); + FSDataInputStream in = mock(FSDataInputStream.class); + + tryCopyStream(in, false); + verify(mockFs).delete(eq(tmpPath), anyBoolean()); + verify(mockFs, never()).rename(any(Path.class), any(Path.class)); + verify(mockFs, never()).delete(eq(path), anyBoolean()); + verify(mockFs, never()).close(); + } + + @Test + public void testInterruptedCopyBytes() throws Exception { + FSDataOutputStream out = mock(FSDataOutputStream.class); + whenFsCreate().thenReturn(out); + when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); + FSInputStream in = mock(FSInputStream.class); + // make IOUtils.copyBytes fail + when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow( + new InterruptedIOException()); + + tryCopyStream(in, false); + verify(mockFs).delete(eq(tmpPath), anyBoolean()); + verify(mockFs, never()).rename(any(Path.class), any(Path.class)); + verify(mockFs, never()).delete(eq(path), anyBoolean()); + verify(mockFs, never()).close(); + } + + @Test + public void testInterruptedRename() throws Exception { + FSDataOutputStream out = mock(FSDataOutputStream.class); + whenFsCreate().thenReturn(out); + when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat); + when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow( + new InterruptedIOException()); + FSInputStream in = mock(FSInputStream.class); + when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1); + + tryCopyStream(in, false); + verify(mockFs).delete(eq(tmpPath), anyBoolean()); + verify(mockFs).rename(eq(tmpPath), eq(path)); + verify(mockFs, never()).delete(eq(path), anyBoolean()); + verify(mockFs, never()).close(); + } + + private OngoingStubbing whenFsCreate() throws IOException { + return when(mockFs.create(eq(tmpPath), any(FsPermission.class), + anyBoolean(), anyInt(), anyShort(), anyLong(), + any(Progressable.class))); + } + + private void tryCopyStream(InputStream in, boolean shouldPass) { + try { + cmd.copyStreamToTarget(new FSDataInputStream(in), target); + } catch (InterruptedIOException e) { + assertFalse("copy failed", shouldPass); + } catch (Throwable e) { + assertFalse(e.getMessage(), shouldPass); + } + } + + static class MockFileSystem extends FilterFileSystem { + Configuration conf; + MockFileSystem() { + super(mockFs); + } + @Override + public void initialize(URI uri, Configuration conf) { + this.conf = conf; + } + @Override + public Path makeQualified(Path path) { + return path; + } + @Override + public Configuration getConf() { + return conf; + } + } +} \ No newline at end of file