HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1368002 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8ea7112595
commit
12d0e02591
|
@ -873,6 +873,9 @@ Release 0.23.3 - UNRELEASED
|
|||
org.apache.hadoop.classification.InterfaceAudience not found (Trevor
|
||||
Robinson via tgraves)
|
||||
|
||||
HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp
|
||||
via tgraves)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.io.InputStream;
|
|||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
|
||||
import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
|
||||
|
@ -232,31 +234,65 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
if (target.exists && (target.stat.isDirectory() || !overwrite)) {
|
||||
throw new PathExistsException(target.toString());
|
||||
}
|
||||
target.fs.setWriteChecksum(writeChecksum);
|
||||
PathData tempFile = null;
|
||||
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
|
||||
try {
|
||||
tempFile = target.createTempFile(target+"._COPYING_");
|
||||
FSDataOutputStream out = target.fs.create(tempFile.path, true);
|
||||
IOUtils.copyBytes(in, out, getConf(), true);
|
||||
PathData tempTarget = target.suffix("._COPYING_");
|
||||
targetFs.setWriteChecksum(writeChecksum);
|
||||
targetFs.writeStreamToFile(in, tempTarget);
|
||||
targetFs.rename(tempTarget, target);
|
||||
} finally {
|
||||
targetFs.close(); // last ditch effort to ensure temp file is removed
|
||||
}
|
||||
}
|
||||
|
||||
// Helper filter filesystem that registers created files as temp files to
|
||||
// be deleted on exit unless successfully renamed
|
||||
private static class TargetFileSystem extends FilterFileSystem {
|
||||
TargetFileSystem(FileSystem fs) {
|
||||
super(fs);
|
||||
}
|
||||
|
||||
void writeStreamToFile(InputStream in, PathData target) throws IOException {
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
out = create(target);
|
||||
IOUtils.copyBytes(in, out, getConf(), true);
|
||||
} finally {
|
||||
IOUtils.closeStream(out); // just in case copyBytes didn't
|
||||
}
|
||||
}
|
||||
|
||||
// tag created files as temp files
|
||||
FSDataOutputStream create(PathData item) throws IOException {
|
||||
try {
|
||||
return create(item.path, true);
|
||||
} finally { // might have been created but stream was interrupted
|
||||
deleteOnExit(item.path);
|
||||
}
|
||||
}
|
||||
|
||||
void rename(PathData src, PathData target) throws IOException {
|
||||
// the rename method with an option to delete the target is deprecated
|
||||
if (target.exists && !target.fs.delete(target.path, false)) {
|
||||
if (target.exists && !delete(target.path, false)) {
|
||||
// too bad we don't know why it failed
|
||||
PathIOException e = new PathIOException(target.toString());
|
||||
e.setOperation("delete");
|
||||
throw e;
|
||||
}
|
||||
if (!tempFile.fs.rename(tempFile.path, target.path)) {
|
||||
if (!rename(src.path, target.path)) {
|
||||
// too bad we don't know why it failed
|
||||
PathIOException e = new PathIOException(tempFile.toString());
|
||||
PathIOException e = new PathIOException(src.toString());
|
||||
e.setOperation("rename");
|
||||
e.setTargetPath(target.toString());
|
||||
throw e;
|
||||
}
|
||||
tempFile = null;
|
||||
} finally {
|
||||
if (tempFile != null) {
|
||||
tempFile.fs.delete(tempFile.path, false);
|
||||
}
|
||||
// cancel delete on exit if rename is successful
|
||||
cancelDeleteOnExit(src.path);
|
||||
}
|
||||
@Override
|
||||
public void close() {
|
||||
// purge any remaining temp files, but don't close underlying fs
|
||||
processDeleteOnExit();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -60,7 +60,7 @@ public class PathData implements Comparable<PathData> {
|
|||
* @throws IOException if anything goes wrong...
|
||||
*/
|
||||
public PathData(String pathString, Configuration conf) throws IOException {
|
||||
this(FileSystem.get(URI.create(pathString), conf), pathString);
|
||||
this(FileSystem.get(stringToUri(pathString), conf), pathString);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -170,16 +170,13 @@ public class PathData implements Comparable<PathData> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a temporary file for this PathData with the given extension.
|
||||
* The file will be deleted on exit.
|
||||
* @param extension for the temporary file
|
||||
* Returns a new PathData with the given extension.
|
||||
* @param extension for the suffix
|
||||
* @return PathData
|
||||
* @throws IOException shouldn't happen
|
||||
*/
|
||||
public PathData createTempFile(String extension) throws IOException {
|
||||
PathData tmpFile = new PathData(fs, uri+"._COPYING_");
|
||||
fs.deleteOnExit(tmpFile.path);
|
||||
return tmpFile;
|
||||
public PathData suffix(String extension) throws IOException {
|
||||
return new PathData(fs, this+extension);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.OngoingStubbing;
|
||||
|
||||
public class TestCopy {
|
||||
static Configuration conf;
|
||||
static Path path = new Path("mockfs:/file");
|
||||
static Path tmpPath = new Path("mockfs:/file._COPYING_");
|
||||
static Put cmd;
|
||||
static FileSystem mockFs;
|
||||
static PathData target;
|
||||
static FileStatus fileStat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
conf = new Configuration();
|
||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||
mockFs = mock(FileSystem.class);
|
||||
fileStat = mock(FileStatus.class);
|
||||
when(fileStat.isDirectory()).thenReturn(false);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void resetMock() throws IOException {
|
||||
reset(mockFs);
|
||||
target = new PathData(path.toString(), conf);
|
||||
cmd = new CopyCommands.Put();
|
||||
cmd.setConf(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyStreamTarget() throws Exception {
|
||||
FSDataOutputStream out = mock(FSDataOutputStream.class);
|
||||
whenFsCreate().thenReturn(out);
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
|
||||
FSInputStream in = mock(FSInputStream.class);
|
||||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
||||
|
||||
tryCopyStream(in, true);
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs).rename(eq(tmpPath), eq(path));
|
||||
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyStreamTargetExists() throws Exception {
|
||||
FSDataOutputStream out = mock(FSDataOutputStream.class);
|
||||
whenFsCreate().thenReturn(out);
|
||||
when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
|
||||
target.refreshStatus(); // so it's updated as existing
|
||||
cmd.setOverwrite(true);
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
|
||||
when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
|
||||
FSInputStream in = mock(FSInputStream.class);
|
||||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
||||
|
||||
tryCopyStream(in, true);
|
||||
verify(mockFs).delete(eq(path), anyBoolean());
|
||||
verify(mockFs).rename(eq(tmpPath), eq(path));
|
||||
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedCreate() throws Exception {
|
||||
whenFsCreate().thenThrow(new InterruptedIOException());
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||
|
||||
tryCopyStream(in, false);
|
||||
verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedCopyBytes() throws Exception {
|
||||
FSDataOutputStream out = mock(FSDataOutputStream.class);
|
||||
whenFsCreate().thenReturn(out);
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
FSInputStream in = mock(FSInputStream.class);
|
||||
// make IOUtils.copyBytes fail
|
||||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
|
||||
new InterruptedIOException());
|
||||
|
||||
tryCopyStream(in, false);
|
||||
verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptedRename() throws Exception {
|
||||
FSDataOutputStream out = mock(FSDataOutputStream.class);
|
||||
whenFsCreate().thenReturn(out);
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
|
||||
new InterruptedIOException());
|
||||
FSInputStream in = mock(FSInputStream.class);
|
||||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
||||
|
||||
tryCopyStream(in, false);
|
||||
verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs).rename(eq(tmpPath), eq(path));
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws IOException {
|
||||
return when(mockFs.create(eq(tmpPath), any(FsPermission.class),
|
||||
anyBoolean(), anyInt(), anyShort(), anyLong(),
|
||||
any(Progressable.class)));
|
||||
}
|
||||
|
||||
private void tryCopyStream(InputStream in, boolean shouldPass) {
|
||||
try {
|
||||
cmd.copyStreamToTarget(new FSDataInputStream(in), target);
|
||||
} catch (InterruptedIOException e) {
|
||||
assertFalse("copy failed", shouldPass);
|
||||
} catch (Throwable e) {
|
||||
assertFalse(e.getMessage(), shouldPass);
|
||||
}
|
||||
}
|
||||
|
||||
static class MockFileSystem extends FilterFileSystem {
|
||||
Configuration conf;
|
||||
MockFileSystem() {
|
||||
super(mockFs);
|
||||
}
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
@Override
|
||||
public Path makeQualified(Path path) {
|
||||
return path;
|
||||
}
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue