HADOOP-17139 Re-enable optimized copyFromLocal implementation in S3AFileSystem (#3101)

This work
* Defines the behavior of FileSystem.copyFromLocal in filesystem.md
* Implements a high performance implementation of copyFromLocalOperation
  for S3 
* Adds a contract test for the operation: AbstractContractCopyFromLocalTest
* Implements the contract tests for Local and S3A FileSystems

Contributed by: Bogdan Stolojan
This commit is contained in:
Petre Bogdan Stolojan 2021-07-30 19:42:08 +01:00 committed by GitHub
parent 6d77f3b6cd
commit a218038960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1172 additions and 181 deletions

View File

@ -524,6 +524,9 @@ public class FileUtil {
if (null != sdst) {
if (sdst.isDirectory()) {
if (null == srcName) {
if (overwrite) {
return dst;
}
throw new PathIsDirectoryException(dst.toString());
}
return checkDest(null, dstFS, new Path(dst, srcName), overwrite);

View File

@ -1419,6 +1419,112 @@ operations related to the part of the file being truncated is undefined.
### `boolean copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)`
The source file or directory at `src` is on the local disk and is copied into the file system at
destination `dst`. If the source must be deleted after the move then `delSrc` flag must be
set to TRUE. If destination already exists, and the destination contents must be overwritten
then `overwrite` flag must be set to TRUE.
#### Preconditions
Source and destination must be different
```python
if src = dest : raise FileExistsException
```
Destination and source must not be descendants one another
```python
if isDescendant(src, dest) or isDescendant(dest, src) : raise IOException
```
The source file or directory must exist locally:
```python
if not exists(LocalFS, src) : raise FileNotFoundException
```
Directories cannot be copied into files regardless to what the overwrite flag is set to:
```python
if isDir(LocalFS, src) and isFile(FS, dst) : raise PathExistsException
```
For all cases, except the one for which the above precondition throws, the overwrite flag must be
set to TRUE for the operation to succeed if destination exists. This will also overwrite any files
/ directories at the destination:
```python
if exists(FS, dst) and not overwrite : raise PathExistsException
```
#### Determining the final name of the copy
Given a base path on the source `base` and a child path `child` where `base` is in
`ancestors(child) + child`:
```python
def final_name(base, child, dest):
is base = child:
return dest
else:
return dest + childElements(base, child)
```
#### Outcome where source is a file `isFile(LocalFS, src)`
For a file, data at destination becomes that of the source. All ancestors are directories.
```python
if isFile(LocalFS, src) and (not exists(FS, dest) or (exists(FS, dest) and overwrite)):
FS' = FS where:
FS'.Files[dest] = LocalFS.Files[src]
FS'.Directories = FS.Directories + ancestors(FS, dest)
LocalFS' = LocalFS where
not delSrc or (delSrc = true and delete(LocalFS, src, false))
else if isFile(LocalFS, src) and isDir(FS, dest):
FS' = FS where:
let d = final_name(src, dest)
FS'.Files[d] = LocalFS.Files[src]
LocalFS' = LocalFS where:
not delSrc or (delSrc = true and delete(LocalFS, src, false))
```
There are no expectations that the file changes are atomic for both local `LocalFS` and remote `FS`.
#### Outcome where source is a directory `isDir(LocalFS, src)`
```python
if isDir(LocalFS, src) and (isFile(FS, dest) or isFile(FS, dest + childElements(src))):
raise FileAlreadyExistsException
else if isDir(LocalFS, src):
if exists(FS, dest):
dest' = dest + childElements(src)
if exists(FS, dest') and not overwrite:
raise PathExistsException
else:
dest' = dest
FS' = FS where:
forall c in descendants(LocalFS, src):
not exists(FS', final_name(c)) or overwrite
and forall c in descendants(LocalFS, src) where isDir(LocalFS, c):
FS'.Directories = FS'.Directories + (dest' + childElements(src, c))
and forall c in descendants(LocalFS, src) where isFile(LocalFS, c):
FS'.Files[final_name(c, dest')] = LocalFS.Files[c]
LocalFS' = LocalFS where
not delSrc or (delSrc = true and delete(LocalFS, src, true))
```
There are no expectations of operation isolation / atomicity.
This means files can change in source or destination while the operation is executing.
No guarantees are made for the final state of the file or directory after a copy other than it is
best effort. E.g.: when copying a directory, one file can be moved from source to destination but
there's nothing stopping the new file at destination being updated while the copy operation is still
in place.
#### Implementation
The default HDFS implementation, is to recurse through each file and folder, found at `src`, and
copy them sequentially to their final destination (relative to `dst`).
Object store based file systems should be mindful of what limitations arise from the above
implementation and could take advantage of parallel uploads and possible re-ordering of files copied
into the store to maximize throughput.
## <a name="RemoteIterator"></a> interface `RemoteIterator`
The `RemoteIterator` interface is used as a remote-access equivalent

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.File;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public class TestLocalFSCopyFromLocal extends AbstractContractCopyFromLocalTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
@Test
public void testDestinationFileIsToParentDirectory() throws Throwable {
describe("Source is a file and destination is its own parent directory");
File file = createTempFile("local");
Path dest = new Path(file.getParentFile().toURI());
Path src = new Path(file.toURI());
intercept(PathOperationException.class,
() -> getFileSystem().copyFromLocalFile( true, true, src, dest));
}
@Test
public void testDestinationDirectoryToSelf() throws Throwable {
describe("Source is a directory and it is copied into itself with " +
"delSrc flag set, destination must not exist");
File source = createTempDirectory("srcDir");
Path dest = new Path(source.toURI());
getFileSystem().copyFromLocalFile( true, true, dest, dest);
assertPathDoesNotExist("Source found", dest);
}
@Test
public void testSourceIntoDestinationSubDirectoryWithDelSrc() throws Throwable {
describe("Copying a parent folder inside a child folder with" +
" delSrc=TRUE");
File parent = createTempDirectory("parent");
File child = createTempDirectory(parent, "child");
Path src = new Path(parent.toURI());
Path dest = new Path(child.toURI());
getFileSystem().copyFromLocalFile(true, true, src, dest);
assertPathDoesNotExist("Source found", src);
assertPathDoesNotExist("Destination found", dest);
}
@Test
public void testSourceIntoDestinationSubDirectory() throws Throwable {
describe("Copying a parent folder inside a child folder with" +
" delSrc=FALSE");
File parent = createTempDirectory("parent");
File child = createTempDirectory(parent, "child");
Path src = new Path(parent.toURI());
Path dest = new Path(child.toURI());
getFileSystem().copyFromLocalFile(false, true, src, dest);
Path recursiveParent = new Path(dest, parent.getName());
Path recursiveChild = new Path(recursiveParent, child.getName());
// This definitely counts as interesting behaviour which needs documented
// Depending on the underlying system this can recurse 15+ times
recursiveParent = new Path(recursiveChild, parent.getName());
recursiveChild = new Path(recursiveParent, child.getName());
assertPathExists("Recursive parent not found", recursiveParent);
assertPathExists("Recursive child not found", recursiveChild);
}
}

View File

@ -0,0 +1,336 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public abstract class AbstractContractCopyFromLocalTest extends
AbstractFSContractTestBase {
private static final Charset ASCII = StandardCharsets.US_ASCII;
private File file;
@Override
public void teardown() throws Exception {
super.teardown();
if (file != null) {
file.delete();
}
}
@Test
public void testCopyEmptyFile() throws Throwable {
file = File.createTempFile("test", ".txt");
Path dest = copyFromLocal(file, true);
assertPathExists("uploaded file not found", dest);
}
@Test
public void testCopyFile() throws Throwable {
String message = "hello";
file = createTempFile(message);
Path dest = copyFromLocal(file, true);
assertPathExists("uploaded file not found", dest);
assertTrue("source file deleted", Files.exists(file.toPath()));
FileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(dest);
assertEquals("File length not equal " + status,
message.getBytes(ASCII).length, status.getLen());
assertFileTextEquals(dest, message);
}
@Test
public void testCopyFileNoOverwrite() throws Throwable {
file = createTempFile("hello");
copyFromLocal(file, true);
intercept(PathExistsException.class,
() -> copyFromLocal(file, false));
}
@Test
public void testCopyFileOverwrite() throws Throwable {
file = createTempFile("hello");
Path dest = copyFromLocal(file, true);
String updated = "updated";
FileUtils.write(file, updated, ASCII);
copyFromLocal(file, true);
assertFileTextEquals(dest, updated);
}
@Test
public void testCopyMissingFile() throws Throwable {
describe("Copying a file that's not there must fail.");
file = createTempFile("test");
file.delete();
// first upload to create
intercept(FileNotFoundException.class, "",
() -> copyFromLocal(file, true));
}
@Test
public void testSourceIsFileAndDelSrcTrue() throws Throwable {
describe("Source is a file delSrc flag is set to true");
file = createTempFile("test");
copyFromLocal(file, false, true);
assertFalse("Source file not deleted", Files.exists(file.toPath()));
}
@Test
public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
describe("Source is a file and destination is a directory. " +
"File must be copied inside the directory.");
file = createTempFile("test");
Path source = new Path(file.toURI());
FileSystem fs = getFileSystem();
File dir = createTempDirectory("test");
Path destination = fileToPath(dir);
// Make sure there's nothing already existing at destination
fs.delete(destination, false);
mkdirs(destination);
fs.copyFromLocalFile(source, destination);
Path expectedFile = path(dir.getName() + "/" + source.getName());
assertPathExists("File not copied into directory", expectedFile);
}
@Test
public void testSourceIsFileAndDestinationIsNonExistentDirectory()
throws Throwable {
describe("Source is a file and destination directory does not exist. " +
"Copy operation must still work.");
file = createTempFile("test");
Path source = new Path(file.toURI());
FileSystem fs = getFileSystem();
File dir = createTempDirectory("test");
Path destination = fileToPath(dir);
fs.delete(destination, false);
assertPathDoesNotExist("Destination not deleted", destination);
fs.copyFromLocalFile(source, destination);
assertPathExists("Destination doesn't exist.", destination);
}
@Test
public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable {
describe("Source is a directory with files, copy must copy all" +
" dir contents to destination");
String firstChild = "childOne";
String secondChild = "childTwo";
File parent = createTempDirectory("parent");
File root = parent.getParentFile();
File childFile = createTempFile(parent, firstChild, firstChild);
File secondChildFile = createTempFile(parent, secondChild, secondChild);
copyFromLocal(parent, false);
assertPathExists("Parent directory not copied", fileToPath(parent));
assertFileTextEquals(fileToPath(childFile, root), firstChild);
assertFileTextEquals(fileToPath(secondChildFile, root), secondChild);
}
@Test
public void testSrcIsEmptyDirWithCopySuccessful() throws Throwable {
describe("Source is an empty directory, copy must succeed");
File source = createTempDirectory("source");
Path dest = copyFromLocal(source, false);
assertPathExists("Empty directory not copied", dest);
}
@Test
public void testSrcIsDirWithOverwriteOptions() throws Throwable {
describe("Source is a directory, destination exists and " +
"must be overwritten.");
FileSystem fs = getFileSystem();
File source = createTempDirectory("source");
Path sourcePath = new Path(source.toURI());
String contents = "test file";
File child = createTempFile(source, "child", contents);
Path dest = path(source.getName()).getParent();
fs.copyFromLocalFile(sourcePath, dest);
intercept(PathExistsException.class,
() -> fs.copyFromLocalFile(false, false,
sourcePath, dest));
String updated = "updated contents";
FileUtils.write(child, updated, ASCII);
fs.copyFromLocalFile(sourcePath, dest);
assertPathExists("Parent directory not copied", fileToPath(source));
assertFileTextEquals(fileToPath(child, source.getParentFile()),
updated);
}
@Test
public void testSrcIsDirWithDelSrcOptions() throws Throwable {
describe("Source is a directory containing a file and delSrc flag is set" +
", this must delete the source after the copy.");
File source = createTempDirectory("source");
String contents = "child file";
File child = createTempFile(source, "child", contents);
copyFromLocal(source, false, true);
Path dest = fileToPath(child, source.getParentFile());
assertFalse("Directory not deleted", Files.exists(source.toPath()));
assertFileTextEquals(dest, contents);
}
/*
* The following path is being created on disk and copied over
* /parent/ (directory)
* /parent/test1.txt
* /parent/child/test.txt
* /parent/secondChild/ (directory)
*/
@Test
public void testCopyTreeDirectoryWithoutDelete() throws Throwable {
File srcDir = createTempDirectory("parent");
File childDir = createTempDirectory(srcDir, "child");
File secondChild = createTempDirectory(srcDir, "secondChild");
File parentFile = createTempFile(srcDir, "test1", ".txt");
File childFile = createTempFile(childDir, "test2", ".txt");
copyFromLocal(srcDir, false, false);
File root = srcDir.getParentFile();
assertPathExists("Parent directory not found",
fileToPath(srcDir));
assertPathExists("Child directory not found",
fileToPath(childDir, root));
assertPathExists("Second child directory not found",
fileToPath(secondChild, root));
assertPathExists("Parent file not found",
fileToPath(parentFile, root));
assertPathExists("Child file not found",
fileToPath(childFile, root));
}
@Test
public void testCopyDirectoryWithDelete() throws Throwable {
java.nio.file.Path srcDir = Files.createTempDirectory("parent");
Files.createTempFile(srcDir, "test1", ".txt");
Path src = new Path(srcDir.toUri());
Path dst = path(srcDir.getFileName().toString());
getFileSystem().copyFromLocalFile(true, true, src, dst);
assertFalse("Source directory was not deleted",
Files.exists(srcDir));
}
@Test
public void testSourceIsDirectoryAndDestinationIsFile() throws Throwable {
describe("Source is a directory and destination is a file must fail");
File file = createTempFile("local");
File source = createTempDirectory("srcDir");
Path destination = copyFromLocal(file, false);
Path sourcePath = new Path(source.toURI());
intercept(FileAlreadyExistsException.class,
() -> getFileSystem().copyFromLocalFile(false, true,
sourcePath, destination));
}
protected Path fileToPath(File file) throws IOException {
return path(file.getName());
}
protected Path fileToPath(File file, File parent) throws IOException {
return path(parent
.toPath()
.relativize(file.toPath())
.toString());
}
protected File createTempDirectory(String name) throws IOException {
return Files.createTempDirectory(name).toFile();
}
protected Path copyFromLocal(File srcFile, boolean overwrite) throws
IOException {
return copyFromLocal(srcFile, overwrite, false);
}
protected Path copyFromLocal(File srcFile, boolean overwrite, boolean delSrc)
throws IOException {
Path src = new Path(srcFile.toURI());
Path dst = path(srcFile.getName());
getFileSystem().copyFromLocalFile(delSrc, overwrite, src, dst);
return dst;
}
/**
* Create a temp file with some text.
* @param text text for the file
* @return the file
* @throws IOException on a failure
*/
protected File createTempFile(String text) throws IOException {
File f = File.createTempFile("test", ".txt");
FileUtils.write(f, text, ASCII);
return f;
}
protected File createTempFile(File parent, String name, String text)
throws IOException {
File f = File.createTempFile(name, ".txt", parent);
FileUtils.write(f, text, ASCII);
return f;
}
protected File createTempDirectory(File parent, String name)
throws IOException {
return Files.createTempDirectory(parent.toPath(), name).toFile();
}
private void assertFileTextEquals(Path path, String expected)
throws IOException {
assertEquals("Wrong data in " + path,
expected, IOUtils.toString(getFileSystem().open(path), ASCII));
}
}

View File

@ -81,6 +81,7 @@ import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.amazonaws.event.ProgressListener;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -3838,73 +3839,82 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
// innerCopyFromLocalFile(delSrc, overwrite, src, dst);
super.copyFromLocalFile(delSrc, overwrite, src, dst);
return null;
});
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
() -> new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks()).execute());
}
/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* <i>HADOOP-15932:</i> this method has been unwired from
* {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
* it is extended to list and copy whole directories.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src Source path: must be on local filesystem
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false, or if the destination is a directory.
* @throws FileNotFoundException if the source file does not exit
* @throws AmazonClientException failure in the AWS SDK
* @throws IllegalArgumentException if the source path is not on the local FS
*/
@Retries.RetryTranslated
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Copying local file from {} to {}", src, dst);
// Since we have a local file, we don't need to stream into a temporary file
protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
if (!srcfile.exists()) {
throw new FileNotFoundException("No file: " + src);
}
if (!srcfile.isFile()) {
throw new FileNotFoundException("Not a file: " + src);
return new CopyFromLocalCallbacksImpl(local);
}
protected class CopyFromLocalCallbacksImpl implements
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
private final LocalFileSystem local;
private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
this.local = local;
}
try {
FileStatus status = innerGetFileStatus(dst, false, StatusProbeEnum.ALL);
if (!status.isFile()) {
throw new FileAlreadyExistsException(dst + " exists and is not a file");
}
if (!overwrite) {
throw new FileAlreadyExistsException(dst + " already exists");
}
} catch (FileNotFoundException e) {
// no destination, all is well
@Override
public RemoteIterator<LocatedFileStatus> listLocalStatusIterator(
final Path path) throws IOException {
return local.listLocatedStatus(path);
}
final String key = pathToKey(dst);
final ObjectMetadata om = newObjectMetadata(srcfile.length());
Progressable progress = null;
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true,
() -> executePut(putObjectRequest, progress));
if (delSrc) {
local.delete(src, false);
@Override
public File pathToLocalFile(Path path) {
return local.pathToFile(path);
}
@Override
public boolean deleteLocal(Path path, boolean recursive) throws IOException {
return local.delete(path, recursive);
}
@Override
public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
trackDurationAndSpan(
OBJECT_PUT_REQUESTS,
to,
() -> {
final String key = pathToKey(to);
final ObjectMetadata om = newObjectMetadata(file.length());
Progressable progress = null;
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
S3AFileSystem.this.invoker.retry(
"putObject(" + "" + ")", to.toString(),
true,
() -> executePut(putObjectRequest, progress));
return null;
});
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return S3AFileSystem.this.getFileStatus(f);
}
@Override
public boolean createEmptyDir(Path path, StoreContext storeContext)
throws IOException {
return trackDuration(getDurationTrackerFactory(),
INVOCATION_MKDIRS.getSymbol(),
new MkdirOperation(
storeContext,
path,
createMkdirOperationCallbacks()));
}
}

View File

@ -0,0 +1,540 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.comparators.ReverseComparator;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
/**
* Implementation of CopyFromLocalOperation.
* <p>
* This operation copies a file or directory (recursively) from a local
* FS to an object store. Initially, this operation has been developed for
* S3 (s3a) interaction, however, there's minimal work needed for it to
* work with other stores.
* </p>
* <p>How the uploading of files works:</p>
* <ul>
* <li> all source files and directories are scanned through;</li>
* <li> the LARGEST_N_FILES start uploading; </li>
* <li> the remaining files are shuffled and uploaded; </li>
* <li>
* any remaining empty directory is uploaded too to preserve local
* tree structure.
* </li>
* </ul>
*/
public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
/**
* Largest N files to be uploaded first.
*/
private static final int LARGEST_N_FILES = 5;
private static final Logger LOG = LoggerFactory.getLogger(
CopyFromLocalOperation.class);
/**
* Callbacks to be used by this operation for external / IO actions.
*/
private final CopyFromLocalOperationCallbacks callbacks;
/**
* Delete source after operation finishes.
*/
private final boolean deleteSource;
/**
* Overwrite destination files / folders.
*/
private final boolean overwrite;
/**
* Source path to file / directory.
*/
private final Path source;
/**
* Async operations executor.
*/
private final ListeningExecutorService executor;
/**
* Destination path.
*/
private Path destination;
/**
* Destination file status.
*/
private FileStatus destStatus;
public CopyFromLocalOperation(
final StoreContext storeContext,
Path source,
Path destination,
boolean deleteSource,
boolean overwrite,
CopyFromLocalOperationCallbacks callbacks) {
super(storeContext);
this.callbacks = callbacks;
this.deleteSource = deleteSource;
this.overwrite = overwrite;
this.source = source;
this.destination = destination;
// Capacity of 1 is a safe default for now since transfer manager can also
// spawn threads when uploading bigger files.
this.executor = MoreExecutors.listeningDecorator(
storeContext.createThrottledExecutor(1)
);
}
/**
* Executes the {@link CopyFromLocalOperation}.
*
* @throws IOException - if there are any failures with upload or deletion
* of files. Check {@link CopyFromLocalOperationCallbacks} for specifics.
* @throws PathExistsException - if the path exists and no overwrite flag
* is set OR if the source is file and destination is a directory
*/
@Override
@Retries.RetryTranslated
public Void execute()
throws IOException, PathExistsException {
LOG.debug("Copying local file from {} to {}", source, destination);
File sourceFile = callbacks.pathToLocalFile(source);
updateDestStatus(destination);
// Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
&& sourceFile.isDirectory()) {
destination = new Path(destination, sourceFile.getName());
LOG.debug("Destination updated to: {}", destination);
updateDestStatus(destination);
}
checkSource(sourceFile);
checkDestination(destination, sourceFile, overwrite);
uploadSourceFromFS();
if (deleteSource) {
callbacks.deleteLocal(source, true);
}
return null;
}
/**
* Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
* operation on the provided destination and updates the internal status of
* destStatus field.
*
* @param dest - destination Path
* @throws IOException if getFileStatus fails
*/
private void updateDestStatus(Path dest) throws IOException {
try {
destStatus = callbacks.getFileStatus(dest);
} catch (FileNotFoundException e) {
destStatus = null;
}
}
/**
* Starts async upload operations for files. Creating an empty directory
* classifies as a "file upload".
*
* Check {@link CopyFromLocalOperation} for details on the order of
* operations.
*
* @throws IOException - if listing or upload fail
*/
private void uploadSourceFromFS() throws IOException {
RemoteIterator<LocatedFileStatus> localFiles = listFilesAndDirs(source);
List<CompletableFuture<Void>> activeOps = new ArrayList<>();
// After all files are traversed, this set will contain only emptyDirs
Set<Path> emptyDirs = new HashSet<>();
List<UploadEntry> entries = new ArrayList<>();
while (localFiles.hasNext()) {
LocatedFileStatus sourceFile = localFiles.next();
Path sourceFilePath = sourceFile.getPath();
// Directory containing this file / directory isn't empty
emptyDirs.remove(sourceFilePath.getParent());
if (sourceFile.isDirectory()) {
emptyDirs.add(sourceFilePath);
continue;
}
Path destPath = getFinalPath(sourceFilePath);
// UploadEntries: have a destination path, a file size
entries.add(new UploadEntry(
sourceFilePath,
destPath,
sourceFile.getLen()));
}
if (localFiles instanceof Closeable) {
IOUtils.closeStream((Closeable) localFiles);
}
// Sort all upload entries based on size
entries.sort(new ReverseComparator(new UploadEntry.SizeComparator()));
// Take only top most N entries and upload
final int sortedUploadsCount = Math.min(LARGEST_N_FILES, entries.size());
List<UploadEntry> markedForUpload = new ArrayList<>();
for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) {
UploadEntry uploadEntry = entries.get(uploadNo);
File file = callbacks.pathToLocalFile(uploadEntry.source);
activeOps.add(submitUpload(file, uploadEntry));
markedForUpload.add(uploadEntry);
}
// No files found, it's empty source directory
if (entries.isEmpty()) {
emptyDirs.add(source);
}
// Shuffle all remaining entries and upload them
entries.removeAll(markedForUpload);
Collections.shuffle(entries);
for (UploadEntry uploadEntry : entries) {
File file = callbacks.pathToLocalFile(uploadEntry.source);
activeOps.add(submitUpload(file, uploadEntry));
}
for (Path emptyDir : emptyDirs) {
Path emptyDirPath = getFinalPath(emptyDir);
activeOps.add(submitCreateEmptyDir(emptyDirPath));
}
waitForCompletion(activeOps);
}
/**
* Async call to create an empty directory.
*
* @param dir directory path
* @return the submitted future
*/
private CompletableFuture<Void> submitCreateEmptyDir(Path dir) {
return submit(executor, callableWithinAuditSpan(
getAuditSpan(), () -> {
callbacks.createEmptyDir(dir, getStoreContext());
return null;
}
));
}
/**
* Async call to upload a file.
*
* @param file - File to be uploaded
* @param uploadEntry - Upload entry holding the source and destination
* @return the submitted future
*/
private CompletableFuture<Void> submitUpload(
File file,
UploadEntry uploadEntry) {
return submit(executor, callableWithinAuditSpan(
getAuditSpan(), () -> {
callbacks.copyLocalFileFromTo(
file,
uploadEntry.source,
uploadEntry.destination);
return null;
}
));
}
/**
* Checks the source before upload starts.
*
* @param src - Source file
* @throws FileNotFoundException - if the file isn't found
*/
private void checkSource(File src)
throws FileNotFoundException {
if (!src.exists()) {
throw new FileNotFoundException("No file: " + src.getPath());
}
}
/**
* Check the destination path and make sure it's compatible with the source,
* i.e. source and destination are both files / directories.
*
* @param dest - Destination path
* @param src - Source file
* @param overwrite - Should source overwrite destination
* @throws PathExistsException - If the destination path exists and no
* overwrite flag is set
* @throws FileAlreadyExistsException - If source is file and destination is path
*/
private void checkDestination(
Path dest,
File src,
boolean overwrite) throws PathExistsException,
FileAlreadyExistsException {
if (!getDestStatus().isPresent()) {
return;
}
if (src.isDirectory() && getDestStatus().get().isFile()) {
throw new FileAlreadyExistsException(
"Source '" + src.getPath() + "' is directory and " +
"destination '" + dest + "' is file");
}
if (!overwrite) {
throw new PathExistsException(dest + " already exists");
}
}
/**
* Get the final path of a source file with regards to its destination.
*
* @param src - source path
* @return - the final path for the source file to be uploaded to
* @throws PathIOException - if a relative path can't be created
*/
private Path getFinalPath(Path src) throws PathIOException {
URI currentSrcUri = src.toUri();
URI relativeSrcUri = source.toUri().relativize(currentSrcUri);
if (relativeSrcUri.equals(currentSrcUri)) {
throw new PathIOException("Cannot get relative path for URI:"
+ relativeSrcUri);
}
Optional<FileStatus> status = getDestStatus();
if (!relativeSrcUri.getPath().isEmpty()) {
return new Path(destination, relativeSrcUri.getPath());
} else if (status.isPresent() && status.get().isDirectory()) {
// file to dir
return new Path(destination, src.getName());
} else {
// file to file
return destination;
}
}
private Optional<FileStatus> getDestStatus() {
return Optional.ofNullable(destStatus);
}
/**
* {@link RemoteIterator} which lists all of the files and directories for
* a given path. It's strikingly similar to
* {@link org.apache.hadoop.fs.LocalFileSystem#listFiles(Path, boolean)}
* however with the small addition that it includes directories.
*
* @param path - Path to list files and directories from
* @return - an iterator
* @throws IOException - if listing of a path file fails
*/
private RemoteIterator<LocatedFileStatus> listFilesAndDirs(Path path)
throws IOException {
return new RemoteIterator<LocatedFileStatus>() {
private final Stack<RemoteIterator<LocatedFileStatus>> iterators =
new Stack<>();
private RemoteIterator<LocatedFileStatus> current =
callbacks.listLocalStatusIterator(path);
private LocatedFileStatus curFile;
@Override
public boolean hasNext() throws IOException {
while (curFile == null) {
if (current.hasNext()) {
handleFileStat(current.next());
} else if (!iterators.empty()) {
current = iterators.pop();
} else {
return false;
}
}
return true;
}
/**
* Process the input stat.
* If it is a file or directory return the file stat.
* If it is a directory, traverse the directory;
* @param stat input status
* @throws IOException if any IO error occurs
*/
private void handleFileStat(LocatedFileStatus stat)
throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else { // directory
curFile = stat;
iterators.push(current);
current = callbacks.listLocalStatusIterator(stat.getPath());
}
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = curFile;
curFile = null;
return result;
}
throw new NoSuchElementException("No more entry in "
+ path);
}
};
}
/**
* <p>Represents an entry for a file to be moved.</p>
* <p>
* Helpful with sorting files by their size and keeping track of path
* information for the upload.
* </p>
*/
private static final class UploadEntry {
private final Path source;
private final Path destination;
private final long size;
private UploadEntry(Path source, Path destination, long size) {
this.source = source;
this.destination = destination;
this.size = size;
}
/**
* Compares {@link UploadEntry} objects and produces DESC ordering.
*/
static class SizeComparator implements Comparator<UploadEntry>,
Serializable {
@Override
public int compare(UploadEntry entry1, UploadEntry entry2) {
return Long.compare(entry1.size, entry2.size);
}
}
}
/**
* Define the contract for {@link CopyFromLocalOperation} to interact
* with any external resources.
*/
public interface CopyFromLocalOperationCallbacks {
/**
* List all entries (files AND directories) for a path.
*
* @param path - path to list
* @return an iterator for all entries
* @throws IOException - for any failure
*/
RemoteIterator<LocatedFileStatus> listLocalStatusIterator(Path path)
throws IOException;
/**
* Get the file status for a path.
*
* @param path - target path
* @return FileStatus
* @throws IOException - for any failure
*/
FileStatus getFileStatus(Path path) throws IOException;
/**
* Get the file from a path.
*
* @param path - target path
* @return file at path
*/
File pathToLocalFile(Path path);
/**
* Delete file / directory at path.
*
* @param path - target path
* @param recursive - recursive deletion
* @return boolean result of operation
* @throws IOException for any failure
*/
boolean deleteLocal(Path path, boolean recursive) throws IOException;
/**
* Copy / Upload a file from a source path to a destination path.
*
* @param file - target file
* @param source - source path
* @param destination - destination path
* @throws IOException for any failure
*/
void copyLocalFileFromTo(
File file,
Path source,
Path destination) throws IOException;
/**
* Create empty directory at path. Most likely an upload operation.
*
* @param path - target path
* @param storeContext - store context
* @return boolean result of operation
* @throws IOException for any failure
*/
boolean createEmptyDir(Path path, StoreContext storeContext)
throws IOException;
}
}

View File

@ -19,143 +19,41 @@
package org.apache.hadoop.fs.s3a;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.junit.Test;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link S3AFileSystem#copyFromLocalFile(boolean, boolean, Path, Path)}.
* Some of the tests have been disabled pending a fix for HADOOP-15932 and
* recursive directory copying; the test cases themselves may be obsolete.
*/
public class ITestS3ACopyFromLocalFile extends AbstractS3ATestBase {
private static final Charset ASCII = StandardCharsets.US_ASCII;
private File file;
public class ITestS3ACopyFromLocalFile extends
AbstractContractCopyFromLocalTest {
@Override
public void teardown() throws Exception {
super.teardown();
if (file != null) {
file.delete();
}
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Test
public void testCopyEmptyFile() throws Throwable {
file = File.createTempFile("test", ".txt");
Path dest = upload(file, true);
assertPathExists("uploaded file", dest);
}
@Test
public void testCopyFile() throws Throwable {
String message = "hello";
file = createTempFile(message);
Path dest = upload(file, true);
assertPathExists("uploaded file not found", dest);
S3AFileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(dest);
assertEquals("File length of " + status,
message.getBytes(ASCII).length, status.getLen());
assertFileTextEquals(dest, message);
}
public void assertFileTextEquals(Path path, String expected)
throws IOException {
assertEquals("Wrong data in " + path,
expected, IOUtils.toString(getFileSystem().open(path), ASCII));
}
@Test
public void testCopyFileNoOverwrite() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
// HADOOP-15932: the exception type changes here
intercept(PathExistsException.class,
() -> upload(file, false));
}
@Test
public void testCopyFileOverwrite() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
String updated = "updated";
FileUtils.write(file, updated, ASCII);
upload(file, true);
assertFileTextEquals(dest, updated);
}
@Test
@Ignore("HADOOP-15932")
public void testCopyFileNoOverwriteDirectory() throws Throwable {
file = createTempFile("hello");
Path dest = upload(file, true);
S3AFileSystem fs = getFileSystem();
fs.delete(dest, false);
fs.mkdirs(dest);
intercept(FileAlreadyExistsException.class,
() -> upload(file, true));
}
@Test
public void testCopyMissingFile() throws Throwable {
file = File.createTempFile("test", ".txt");
file.delete();
// first upload to create
intercept(FileNotFoundException.class, "",
() -> upload(file, true));
}
@Test
@Ignore("HADOOP-15932")
public void testCopyDirectoryFile() throws Throwable {
file = File.createTempFile("test", ".txt");
// first upload to create
intercept(FileNotFoundException.class, "Not a file",
() -> upload(file.getParentFile(), true));
}
@Test
public void testLocalFilesOnly() throws Throwable {
Path dst = path("testLocalFilesOnly");
describe("Copying into other file systems must fail");
Path dest = fileToPath(createTempDirectory("someDir"));
intercept(IllegalArgumentException.class,
() -> {
getFileSystem().copyFromLocalFile(false, true, dst, dst);
return "copy successful";
});
() -> getFileSystem().copyFromLocalFile(false, true, dest, dest));
}
public Path upload(File srcFile, boolean overwrite) throws IOException {
Path src = new Path(srcFile.toURI());
Path dst = path(srcFile.getName());
getFileSystem().copyFromLocalFile(false, overwrite, src, dst);
return dst;
}
@Test
public void testOnlyFromLocal() throws Throwable {
describe("Copying must be from a local file system");
File source = createTempFile("someFile");
Path dest = copyFromLocal(source, true);
/**
* Create a temp file with some text.
* @param text text for the file
* @return the file
* @throws IOException on a failure
*/
public File createTempFile(String text) throws IOException {
File f = File.createTempFile("test", ".txt");
FileUtils.write(f, text, ASCII);
return f;
intercept(IllegalArgumentException.class,
() -> getFileSystem().copyFromLocalFile(true, true, dest, dest));
}
}