HADOOP-13823. s3a rename: fail if dest file exists. Contributed by Steve Loughran

This commit is contained in:
Mingliang Liu 2016-11-28 16:30:29 -08:00
parent be88d574a9
commit d60a60be8a
4 changed files with 130 additions and 46 deletions

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
/**
* Error to indicate that a specific rename failed.
* The exit code defines the exit code to be returned in the {@code rename()}
* call.
* Target path is set to destination.
*/
public class RenameFailedException extends PathIOException {
/**
* Exit code to be returned.
*/
private boolean exitCode = false;
public RenameFailedException(String src, String dest, Throwable cause) {
super(src, cause);
setOperation("rename");
setTargetPath(dest);
}
public RenameFailedException(String src, String dest, String error) {
super(src, error);
setOperation("rename");
setTargetPath(dest);
}
public RenameFailedException(Path src, Path optionalDest, String error) {
super(src.toString(), error);
setOperation("rename");
if (optionalDest != null) {
setTargetPath(optionalDest.toString());
}
}
public boolean getExitCode() {
return exitCode;
}
/**
* Set the exit code.
* @param code exit code to raise
* @return the exception
*/
public RenameFailedException withExitCode(boolean code) {
this.exitCode = code;
return this;
}
}

View File

@ -630,10 +630,12 @@ public class S3AFileSystem extends FileSystem {
* there is no Progressable passed in, this can time out jobs.
*
* Note: This implementation differs with other S3 drivers. Specifically:
* <pre>
* Fails if src is a file and dst is a directory.
* Fails if src is a directory and dst is a file.
* Fails if the parent of dst does not exist or is a file.
* Fails if dst is a directory that is not empty.
* </pre>
*
* @param src path to be renamed
* @param dst new path after rename
@ -645,58 +647,86 @@ public class S3AFileSystem extends FileSystem {
return innerRename(src, dst);
} catch (AmazonClientException e) {
throw translateException("rename(" + src +", " + dst + ")", src, e);
} catch (RenameFailedException e) {
LOG.debug(e.getMessage());
return e.getExitCode();
} catch (FileNotFoundException e) {
LOG.debug(e.toString());
return false;
}
}
/**
* The inner rename operation. See {@link #rename(Path, Path)} for
* the description of the operation.
* This operation throws an exception on any failure which needs to be
* reported and downgraded to a failure. That is: if a rename
* @param src path to be renamed
* @param dst new path after rename
* @return true if rename is successful
* @throws RenameFailedException if some criteria for a state changing
* rename was not met. This means work didn't happen; it's not something
* which is reported upstream to the FileSystem APIs, for which the semantics
* of "false" are pretty vague.
* @throws FileNotFoundException there's no source file.
* @throws IOException on IO failure.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private boolean innerRename(Path src, Path dst) throws IOException,
AmazonClientException {
private boolean innerRename(Path src, Path dst)
throws RenameFailedException, FileNotFoundException, IOException,
AmazonClientException {
LOG.debug("Rename path {} to {}", src, dst);
incrementStatistic(INVOCATION_RENAME);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
if (srcKey.isEmpty() || dstKey.isEmpty()) {
LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey);
return false;
if (srcKey.isEmpty()) {
throw new RenameFailedException(src, dst, "source is root directory");
}
if (dstKey.isEmpty()) {
throw new RenameFailedException(src, dst, "dest is root directory");
}
S3AFileStatus srcStatus;
try {
srcStatus = getFileStatus(src);
} catch (FileNotFoundException e) {
LOG.error("rename: src not found {}", src);
return false;
}
// get the source file status; this raises a FNFE if there is no source
// file.
S3AFileStatus srcStatus = getFileStatus(src);
if (srcKey.equals(dstKey)) {
LOG.debug("rename: src and dst refer to the same file or directory: {}",
LOG.debug("rename: src and dest refer to the same file or directory: {}",
dst);
return srcStatus.isFile();
throw new RenameFailedException(src, dst,
"source and dest refer to the same file or directory")
.withExitCode(srcStatus.isFile());
}
S3AFileStatus dstStatus = null;
try {
dstStatus = getFileStatus(dst);
if (srcStatus.isDirectory() && dstStatus.isFile()) {
LOG.debug("rename: src {} is a directory and dst {} is a file",
src, dst);
return false;
// if there is no destination entry, an exception is raised.
// hence this code sequence can assume that there is something
// at the end of the path; the only detail being what it is and
// whether or not it can be the destination of the rename.
if (srcStatus.isDirectory()) {
if (dstStatus.isFile()) {
throw new RenameFailedException(src, dst,
"source is a directory and dest is a file")
.withExitCode(srcStatus.isFile());
} else if (!dstStatus.isEmptyDirectory()) {
throw new RenameFailedException(src, dst,
"Destination is a non-empty directory")
.withExitCode(false);
}
// at this point the destination is an empty directory
} else {
// source is a file. The destination must be a directory,
// empty or not
if (dstStatus.isFile()) {
throw new RenameFailedException(src, dst,
"Cannot rename onto an existing file")
.withExitCode(false);
}
}
if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) {
return false;
}
} catch (FileNotFoundException e) {
LOG.debug("rename: destination path {} not found", dst);
// Parent must exist
@ -705,12 +735,12 @@ public class S3AFileSystem extends FileSystem {
try {
S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
if (!dstParentStatus.isDirectory()) {
return false;
throw new RenameFailedException(src, dst,
"destination parent is not a directory");
}
} catch (FileNotFoundException e2) {
LOG.debug("rename: destination path {} has no parent {}",
dst, parent);
return false;
throw new RenameFailedException(src, dst,
"destination has no parent ");
}
}
}
@ -745,9 +775,8 @@ public class S3AFileSystem extends FileSystem {
//Verify dest is not a child of the source directory
if (dstKey.startsWith(srcKey)) {
LOG.debug("cannot rename a directory {}" +
" to a subdirectory of self: {}", srcKey, dstKey);
return false;
throw new RenameFailedException(srcKey, dstKey,
"cannot rename a directory to a subdirectory o fitself ");
}
List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();

View File

@ -88,21 +88,6 @@ public class ITestS3AFileSystemContract extends FileSystemContractBaseTest {
// not supported
}
@Override
public void testRenameFileAsExistingFile() throws Exception {
if (!renameSupported()) {
return;
}
Path src = path("/test/hadoop/file");
createFile(src);
Path dst = path("/test/new/newfile");
createFile(dst);
// s3 doesn't support rename option
// rename-overwrites-dest is always allowed.
rename(src, dst, true, false, true);
}
@Override
public void testRenameDirectoryAsExistingDirectory() throws Exception {
if (!renameSupported()) {

View File

@ -114,7 +114,7 @@
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
<value>false</value>
</property>
</configuration>