HDFS-13934. Multipart uploaders to be created through FileSystem/FileContext.

Contributed by Steve Loughran.

Change-Id: Iebd34140c1a0aa71f44a3f4d0fee85f6bdf123a3
This commit is contained in:
Steve Loughran 2020-07-13 13:30:02 +01:00
parent b97fea65e7
commit b9fa5e0182
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
45 changed files with 2603 additions and 794 deletions

View File

@ -1382,4 +1382,34 @@ public abstract class AbstractFileSystem implements PathCapabilities {
return false;
}
}
/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
methodNotSupported();
return null;
}
/**
* Helper method that throws an {@link UnsupportedOperationException} for the
* current {@link FileSystem} method being called.
*/
protected final void methodNotSupported() {
// The order of the stacktrace elements is (from top to bottom):
// - java.lang.Thread.getStackTrace
// - org.apache.hadoop.fs.FileSystem.methodNotSupported
// - <the FileSystem method>
// therefore, to find out the current method name, we use the element at
// index 2.
String name = Thread.currentThread().getStackTrace()[2].getMethodName();
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method " + name);
}
}

View File

@ -131,4 +131,12 @@ public final class CommonPathCapabilities {
@InterfaceStability.Unstable
public static final String FS_EXPERIMENTAL_BATCH_LISTING =
"fs.capability.batch.listing";
/**
* Does the store support multipart uploading?
* Value: {@value}.
*/
public static final String FS_MULTIPART_UPLOADER =
"fs.capability.multipart.uploader";
}

View File

@ -2957,4 +2957,31 @@ public class FileContext implements PathCapabilities {
(fs, p) -> fs.hasPathCapability(p, capability));
}
/**
* Return a set of server default configuration values based on path.
* @param path path to fetch server defaults
* @return server default configuration values for path
* @throws IOException an I/O error occurred
*/
public FsServerDefaults getServerDefaults(final Path path)
throws IOException {
return FsLinkResolution.resolve(this,
fixRelativePart(path),
(fs, p) -> fs.getServerDefaults(p));
}
/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
return FsLinkResolution.resolve(this,
fixRelativePart(basePath),
(fs, p) -> fs.createMultipartUploader(p));
}
}

View File

@ -132,22 +132,35 @@ import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapa
* New methods may be marked as Unstable or Evolving for their initial release,
* as a warning that they are new and may change based on the
* experience of use in applications.
* <p></p>
* <b>Important note for developers</b>
*
* If you're making changes here to the public API or protected methods,
* <p></p>
* If you are making changes here to the public API or protected methods,
* you must review the following subclasses and make sure that
* they are filtering/passing through new methods as appropriate.
* <p></p>
*
* {@link FilterFileSystem}: methods are passed through.
* {@link FilterFileSystem}: methods are passed through. If not,
* then {@code TestFilterFileSystem.MustNotImplement} must be
* updated with the unsupported interface.
* Furthermore, if the new API's support is probed for via
* {@link #hasPathCapability(Path, String)} then
* {@link FilterFileSystem#hasPathCapability(Path, String)}
* must return false, always.
* <p></p>
* {@link ChecksumFileSystem}: checksums are created and
* verified.
* <p></p>
* {@code TestHarFileSystem} will need its {@code MustNotImplement}
* interface updated.
* <p></p>
*
* There are some external places your changes will break things.
* Do co-ordinate changes here.
* <p></p>
*
* HBase: HBoss
* <p></p>
* Hive: HiveShim23
* {@code shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java}
*
@ -4644,4 +4657,17 @@ public abstract class FileSystem extends Configured
}
/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
methodNotSupported();
return null;
}
}

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* A <code>FilterFileSystem</code> contains
* some other file system, which it uses as
@ -728,7 +730,16 @@ public class FilterFileSystem extends FileSystem {
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
return fs.hasPathCapability(path, capability);
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
case CommonPathCapabilities.FS_EXPERIMENTAL_BATCH_LISTING:
// operations known to be unsupported, irrespective of what
// the wrapped class implements.
return false;
default:
// the feature is not implemented.
return fs.hasPathCapability(path, capability);
}
}
}

View File

@ -448,4 +448,10 @@ public abstract class FilterFs extends AbstractFileSystem {
throws IOException {
return myFs.hasPathCapability(path, capability);
}
@Override
public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return myFs.createMultipartUploader(basePath);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,26 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemMultipartUploader;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Support for HDFS multipart uploads, built on
* {@link FileSystem#concat(Path, Path[])}.
* This method allows access to Package-scoped operations from classes
* in org.apache.hadoop.fs.impl and other file system implementations
* in the hadoop modules.
* This is absolutely not for used by any other application or library.
*/
public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
return new FileSystemMultipartUploader(fs);
}
return null;
@InterfaceAudience.Private
public class InternalOperations {
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public void rename(FileSystem fs, final Path src, final Path dst,
final Options.Rename...options) throws IOException {
fs.rename(src, dst, options);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,45 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static com.google.common.base.Preconditions.checkArgument;
/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes. Users should:
* <ol>
* <li>Initialize an upload.</li>
* <li>Upload parts in any order.</li>
* <li>Complete the upload in order to have it materialize in the destination
* FS.</li>
* </ol>
* multiple nodes.
*/
@InterfaceAudience.Private
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class MultipartUploader implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploader.class);
public interface MultipartUploader extends Closeable {
/**
* Perform any cleanup.
* The upload is not required to support any operations after this.
* @throws IOException problems on close.
*/
@Override
public void close() throws IOException {
}
/**
* Initialize a multipart upload.
@ -61,94 +42,64 @@ public abstract class MultipartUploader implements Closeable {
* @return unique identifier associating part uploads.
* @throws IOException IO failure
*/
public abstract UploadHandle initialize(Path filePath) throws IOException;
CompletableFuture<UploadHandle> startUpload(Path filePath)
throws IOException;
/**
* Put part as part of a multipart upload.
* It is possible to have parts uploaded in any order (or in parallel).
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param partNumber Index of the part relative to others.
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #initialize(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @return unique PartHandle identifier for the uploaded part.
* @throws IOException IO failure
*/
public abstract PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
CompletableFuture<PartHandle> putPart(
UploadHandle uploadId,
int partNumber,
Path filePath,
InputStream inputStream,
long lengthInBytes)
throws IOException;
/**
* Complete a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param filePath Target path for upload (as {@link #startUpload(Path)}.
* @param handles non-empty map of part number to part handle.
* from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
* @return unique PathHandle identifier for the uploaded file.
* @throws IOException IO failure
*/
public abstract PathHandle complete(Path filePath,
Map<Integer, PartHandle> handles,
UploadHandle multipartUploadId)
CompletableFuture<PathHandle> complete(
UploadHandle uploadId,
Path filePath,
Map<Integer, PartHandle> handles)
throws IOException;
/**
* Aborts a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param filePath Target path for upload (same as {@link #startUpload(Path)}.
* @throws IOException IO failure
* @return a future; the operation will have completed
*/
public abstract void abort(Path filePath, UploadHandle multipartUploadId)
CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
throws IOException;
/**
* Utility method to validate uploadIDs.
* @param uploadId Upload ID
* @throws IllegalArgumentException invalid ID
* Best effort attempt to aborts multipart uploads under a path.
* Not all implementations support this, and those which do may
* be vulnerable to eventually consistent listings of current uploads
* -some may be missed.
* @param path path to abort uploads under.
* @return a future to the number of entries aborted;
* -1 if aborting is unsupported
* @throws IOException IO failure
*/
protected void checkUploadId(byte[] uploadId)
throws IllegalArgumentException {
checkArgument(uploadId != null, "null uploadId");
checkArgument(uploadId.length > 0,
"Empty UploadId is not valid");
}
CompletableFuture<Integer> abortUploadsUnderPath(Path path) throws IOException;
/**
* Utility method to validate partHandles.
* @param partHandles handles
* @throws IllegalArgumentException if the parts are invalid
*/
protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
checkArgument(!partHandles.isEmpty(),
"Empty upload");
partHandles.keySet()
.stream()
.forEach(key ->
checkArgument(key > 0,
"Invalid part handle index %s", key));
}
/**
* Check all the arguments to the
* {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #initialize(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @throws IllegalArgumentException invalid argument
*/
protected void checkPutArguments(Path filePath,
InputStream inputStream,
int partNumber,
UploadHandle uploadId,
long lengthInBytes) throws IllegalArgumentException {
checkArgument(filePath != null, "null filePath");
checkArgument(inputStream != null, "null inputStream");
checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
checkArgument(uploadId != null, "null uploadId");
checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import javax.annotation.Nonnull;
import java.io.IOException;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* Builder interface for Multipart readers.
* @param <S>
* @param <B>
*/
public interface MultipartUploaderBuilder<S extends MultipartUploader, B extends MultipartUploaderBuilder<S, B>>
extends FSBuilder<S, B> {
/**
* Set permission for the file.
*/
B permission(@Nonnull FsPermission perm);
/**
* Set the size of the buffer to be used.
*/
B bufferSize(int bufSize);
/**
* Set replication factor.
*/
B replication(short replica);
/**
* Set block size.
*/
B blockSize(long blkSize);
/**
* Create an FSDataOutputStream at the specified path.
*/
B create();
/**
* Set to true to overwrite the existing file.
* Set it to false, an exception will be thrown when calling {@link #build()}
* if the file exists.
*/
B overwrite(boolean overwrite);
/**
* Append to an existing file (optional operation).
*/
B append();
/**
* Set checksum opt.
*/
B checksumOpt(@Nonnull Options.ChecksumOpt chksumOpt);
/**
* Create the FSDataOutputStream to write on the file system.
*
* @throws IllegalArgumentException if the parameters are not valid.
* @throws IOException on errors when file system creates or appends the file.
*/
S build() throws IllegalArgumentException, IOException;
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.ServiceLoader;
/**
* {@link ServiceLoader}-driven uploader API for storage services supporting
* multipart uploads.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class MultipartUploaderFactory {
public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploaderFactory.class);
/**
* Multipart Uploaders listed as services.
*/
private static ServiceLoader<MultipartUploaderFactory> serviceLoader =
ServiceLoader.load(MultipartUploaderFactory.class,
MultipartUploaderFactory.class.getClassLoader());
// Iterate through the serviceLoader to avoid lazy loading.
// Lazy loading would require synchronization in concurrent use cases.
static {
Iterator<MultipartUploaderFactory> iterServices = serviceLoader.iterator();
while (iterServices.hasNext()) {
iterServices.next();
}
}
/**
* Get the multipart loader for a specific filesystem.
* @param fs filesystem
* @param conf configuration
* @return an uploader, or null if one was found.
* @throws IOException failure during the creation process.
*/
public static MultipartUploader get(FileSystem fs, Configuration conf)
throws IOException {
MultipartUploader mpu = null;
for (MultipartUploaderFactory factory : serviceLoader) {
mpu = factory.createMultipartUploader(fs, conf);
if (mpu != null) {
break;
}
}
return mpu;
}
protected abstract MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) throws IOException;
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UploadHandle;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Standard base class for Multipart Uploaders.
*/
public abstract class AbstractMultipartUploader implements MultipartUploader {
/**
* Base path of upload.
*/
private final Path basePath;
/**
* Instantiate.
* @param basePath base path
*/
protected AbstractMultipartUploader(final Path basePath) {
this.basePath = Objects.requireNonNull(basePath, "null path");
}
/**
* Perform any cleanup.
* The upload is not required to support any operations after this.
* @throws IOException problems on close.
*/
@Override
public void close() throws IOException {
}
protected Path getBasePath() {
return basePath;
}
/**
* Validate a path.
* @param path path to check.
*/
protected void checkPath(Path path) {
Objects.requireNonNull(path, "null path");
Preconditions.checkArgument(path.toString().startsWith(basePath.toString()),
"Path %s is not under %s", path, basePath);
}
/**
* Utility method to validate uploadIDs.
* @param uploadId Upload ID
* @throws IllegalArgumentException invalid ID
*/
protected void checkUploadId(byte[] uploadId)
throws IllegalArgumentException {
checkArgument(uploadId != null, "null uploadId");
checkArgument(uploadId.length > 0,
"Empty UploadId is not valid");
}
/**
* Utility method to validate partHandles.
* @param partHandles handles
* @throws IllegalArgumentException if the parts are invalid
*/
protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
checkArgument(!partHandles.isEmpty(),
"Empty upload");
partHandles.keySet()
.stream()
.forEach(key ->
checkArgument(key > 0,
"Invalid part handle index %s", key));
}
/**
* Check all the arguments to the
* {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
* operation.
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @throws IllegalArgumentException invalid argument
*/
protected void checkPutArguments(Path filePath,
InputStream inputStream,
int partNumber,
UploadHandle uploadId,
long lengthInBytes) throws IllegalArgumentException {
checkPath(filePath);
checkArgument(inputStream != null, "null inputStream");
checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
checkArgument(uploadId != null, "null uploadId");
checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
}
/**
* {@inheritDoc}.
* @param path path to abort uploads under.
* @return a future to -1.
* @throws IOException
*/
public CompletableFuture<Integer> abortUploadsUnderPath(Path path)
throws IOException {
checkPath(path);
CompletableFuture<Integer> f = new CompletableFuture<>();
f.complete(-1);
return f;
}
}

View File

@ -14,24 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBPartHandle;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InternalOperations;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.Path.mergePaths;
@ -50,40 +68,82 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemMultipartUploader extends MultipartUploader {
public class FileSystemMultipartUploader extends AbstractMultipartUploader {
private static final Logger LOG = LoggerFactory.getLogger(
FileSystemMultipartUploader.class);
private final FileSystem fs;
public FileSystemMultipartUploader(FileSystem fs) {
private final FileSystemMultipartUploaderBuilder builder;
private final FsPermission permission;
private final long blockSize;
private final Options.ChecksumOpt checksumOpt;
public FileSystemMultipartUploader(
final FileSystemMultipartUploaderBuilder builder,
FileSystem fs) {
super(builder.getPath());
this.builder = builder;
this.fs = fs;
blockSize = builder.getBlockSize();
checksumOpt = builder.getChecksumOpt();
permission = builder.getPermission();
}
@Override
public UploadHandle initialize(Path filePath) throws IOException {
Path collectorPath = createCollectorPath(filePath);
fs.mkdirs(collectorPath, FsPermission.getDirDefault());
public CompletableFuture<UploadHandle> startUpload(Path filePath)
throws IOException {
checkPath(filePath);
return FutureIOSupport.eval(() -> {
Path collectorPath = createCollectorPath(filePath);
fs.mkdirs(collectorPath, FsPermission.getDirDefault());
ByteBuffer byteBuffer = ByteBuffer.wrap(
collectorPath.toString().getBytes(Charsets.UTF_8));
return BBUploadHandle.from(byteBuffer);
ByteBuffer byteBuffer = ByteBuffer.wrap(
collectorPath.toString().getBytes(Charsets.UTF_8));
return BBUploadHandle.from(byteBuffer);
});
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
int partNumber, Path filePath,
InputStream inputStream,
long lengthInBytes)
throws IOException {
checkPutArguments(filePath, inputStream, partNumber, uploadId,
lengthInBytes);
return FutureIOSupport.eval(() -> innerPutPart(filePath,
inputStream, partNumber, uploadId, lengthInBytes));
}
private PartHandle innerPutPart(Path filePath,
InputStream inputStream,
int partNumber,
UploadHandle uploadId,
long lengthInBytes)
throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
checkUploadId(uploadIdByteArray);
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
Path partPath =
mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
new Path(Integer.toString(partNumber) + ".part")));
try(FSDataOutputStream fsDataOutputStream =
fs.createFile(partPath).build()) {
IOUtils.copy(inputStream, fsDataOutputStream, 4096);
new Path(partNumber + ".part")));
final FSDataOutputStreamBuilder fileBuilder = fs.createFile(partPath);
if (checksumOpt != null) {
fileBuilder.checksumOpt(checksumOpt);
}
if (permission != null) {
fileBuilder.permission(permission);
}
try (FSDataOutputStream fsDataOutputStream =
fileBuilder.blockSize(blockSize).build()) {
IOUtils.copy(inputStream, fsDataOutputStream,
this.builder.getBufferSize());
} finally {
cleanupWithLogger(LOG, inputStream);
}
@ -106,16 +166,36 @@ public class FileSystemMultipartUploader extends MultipartUploader {
private long totalPartsLen(List<Path> partHandles) throws IOException {
long totalLen = 0;
for (Path p: partHandles) {
for (Path p : partHandles) {
totalLen += fs.getFileStatus(p).getLen();
}
return totalLen;
}
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath, Map<Integer, PartHandle> handleMap,
UploadHandle multipartUploadId) throws IOException {
public CompletableFuture<PathHandle> complete(
UploadHandle uploadId,
Path filePath,
Map<Integer, PartHandle> handleMap) throws IOException {
checkPath(filePath);
return FutureIOSupport.eval(() ->
innerComplete(uploadId, filePath, handleMap));
}
/**
* The upload complete operation.
* @param multipartUploadId the ID of the upload
* @param filePath path
* @param handleMap map of handles
* @return the path handle
* @throws IOException failure
*/
private PathHandle innerComplete(
UploadHandle multipartUploadId, Path filePath,
Map<Integer, PartHandle> handleMap) throws IOException {
checkPath(filePath);
checkUploadId(multipartUploadId.toByteArray());
@ -133,6 +213,13 @@ public class FileSystemMultipartUploader extends MultipartUploader {
})
.collect(Collectors.toList());
int count = partHandles.size();
// built up to identify duplicates -if the size of this set is
// below that of the number of parts, then there's a duplicate entry.
Set<Path> values = new HashSet<>(count);
values.addAll(partHandles);
Preconditions.checkArgument(values.size() == count,
"Duplicate PartHandles");
byte[] uploadIdByteArray = multipartUploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
@ -146,35 +233,30 @@ public class FileSystemMultipartUploader extends MultipartUploader {
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
new InternalOperations()
.rename(fs, filePathInsideCollector, filePath,
Options.Rename.OVERWRITE);
}
fs.delete(collectorPath, true);
return getPathHandle(filePath);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) throws IOException {
public CompletableFuture<Void> abort(UploadHandle uploadId,
Path filePath)
throws IOException {
checkPath(filePath);
byte[] uploadIdByteArray = uploadId.toByteArray();
checkUploadId(uploadIdByteArray);
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
// force a check for a file existing; raises FNFE if not found
fs.getFileStatus(collectorPath);
fs.delete(collectorPath, true);
return FutureIOSupport.eval(() -> {
// force a check for a file existing; raises FNFE if not found
fs.getFileStatus(collectorPath);
fs.delete(collectorPath, true);
return null;
});
}
/**
* Factory for creating MultipartUploaderFactory objects for file://
* filesystems.
*/
public static class Factory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals("file")) {
return new FileSystemMultipartUploader(fs);
}
return null;
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* Builder for {@link FileSystemMultipartUploader}.
*/
public class FileSystemMultipartUploaderBuilder extends
MultipartUploaderBuilderImpl<FileSystemMultipartUploader, FileSystemMultipartUploaderBuilder> {
public FileSystemMultipartUploaderBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final Path path) {
super(fileSystem, path);
}
@Override
public FileSystemMultipartUploaderBuilder getThisBuilder() {
return this;
}
@Override
public FileSystemMultipartUploader build()
throws IllegalArgumentException, IOException {
return new FileSystemMultipartUploader(this, getFS());
}
@Override
public FileSystem getFS() {
return super.getFS();
}
@Override
public FsPermission getPermission() {
return super.getPermission();
}
@Override
public int getBufferSize() {
return super.getBufferSize();
}
@Override
public short getReplication() {
return super.getReplication();
}
@Override
public EnumSet<CreateFlag> getFlags() {
return super.getFlags();
}
@Override
public Options.ChecksumOpt getChecksumOpt() {
return super.getChecksumOpt();
}
@Override
protected long getBlockSize() {
return super.getBlockSize();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -52,7 +53,7 @@ public final class FutureIOSupport {
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> T awaitFuture(final Future<T> future)
public static <T> T awaitFuture(final Future<T> future)
throws InterruptedIOException, IOException, RuntimeException {
try {
return future.get();
@ -224,4 +225,29 @@ public final class FutureIOSupport {
}
}
}
/**
* Evaluate a CallableRaisingIOE in the current thread,
* converting IOEs to RTEs and propagating.
* @param callable callable to invoke
* @param <T> Return type.
* @return the evaluated result.
* @throws UnsupportedOperationException fail fast if unsupported
* @throws IllegalArgumentException invalid argument
*/
public static <T> CompletableFuture<T> eval(
FunctionsRaisingIOE.CallableRaisingIOE<T> callable) {
CompletableFuture<T> result = new CompletableFuture<>();
try {
result.complete(callable.apply());
} catch (UnsupportedOperationException | IllegalArgumentException tx) {
// fail fast here
throw tx;
} catch (Throwable tx) {
// fail lazily here to ensure callers expect all File IO operations to
// surface later
result.completeExceptionally(tx);
}
return result;
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderBuilder;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/**
* Builder for {@link MultipartUploader} implementations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class MultipartUploaderBuilderImpl
<S extends MultipartUploader, B extends MultipartUploaderBuilder<S, B>>
extends AbstractFSBuilderImpl<S, B>
implements MultipartUploaderBuilder<S, B> {
private final FileSystem fs;
private FsPermission permission;
private int bufferSize;
private short replication;
private long blockSize;
private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
private ChecksumOpt checksumOpt;
/**
* Return the concrete implementation of the builder instance.
*/
public abstract B getThisBuilder();
/**
* Construct from a {@link FileContext}.
*
* @param fc FileContext
* @param p path.
* @throws IOException failure
*/
protected MultipartUploaderBuilderImpl(@Nonnull FileContext fc,
@Nonnull Path p) throws IOException {
super(checkNotNull(p));
checkNotNull(fc);
this.fs = null;
FsServerDefaults defaults = fc.getServerDefaults(p);
bufferSize = defaults.getFileBufferSize();
replication = defaults.getReplication();
blockSize = defaults.getBlockSize();
}
/**
* Constructor.
*/
protected MultipartUploaderBuilderImpl(@Nonnull FileSystem fileSystem,
@Nonnull Path p) {
super(fileSystem.makeQualified(checkNotNull(p)));
checkNotNull(fileSystem);
fs = fileSystem;
bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);
replication = fs.getDefaultReplication(p);
blockSize = fs.getDefaultBlockSize(p);
}
protected FileSystem getFS() {
checkNotNull(fs);
return fs;
}
protected FsPermission getPermission() {
if (permission == null) {
permission = FsPermission.getFileDefault();
}
return permission;
}
/**
* Set permission for the file.
*/
@Override
public B permission(@Nonnull final FsPermission perm) {
checkNotNull(perm);
permission = perm;
return getThisBuilder();
}
protected int getBufferSize() {
return bufferSize;
}
/**
* Set the size of the buffer to be used.
*/
@Override
public B bufferSize(int bufSize) {
bufferSize = bufSize;
return getThisBuilder();
}
protected short getReplication() {
return replication;
}
/**
* Set replication factor.
*/
@Override
public B replication(short replica) {
replication = replica;
return getThisBuilder();
}
protected long getBlockSize() {
return blockSize;
}
/**
* Set block size.
*/
@Override
public B blockSize(long blkSize) {
blockSize = blkSize;
return getThisBuilder();
}
protected EnumSet<CreateFlag> getFlags() {
return flags;
}
/**
* Create an FSDataOutputStream at the specified path.
*/
@Override
public B create() {
flags.add(CreateFlag.CREATE);
return getThisBuilder();
}
/**
* Set to true to overwrite the existing file.
* Set it to false, an exception will be thrown when calling {@link #build()}
* if the file exists.
*/
@Override
public B overwrite(boolean overwrite) {
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
} else {
flags.remove(CreateFlag.OVERWRITE);
}
return getThisBuilder();
}
/**
* Append to an existing file (optional operation).
*/
@Override
public B append() {
flags.add(CreateFlag.APPEND);
return getThisBuilder();
}
protected ChecksumOpt getChecksumOpt() {
return checksumOpt;
}
/**
* Set checksum opt.
*/
@Override
public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
checkNotNull(chksumOpt);
checksumOpt = chksumOpt;
return getThisBuilder();
}
}

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.FileSystemMultipartUploader$Factory

View File

@ -14,14 +14,14 @@
<!-- ============================================================= -->
<!-- CLASS: MultipartUploader -->
<!-- INTERFACE: MultipartUploader -->
<!-- ============================================================= -->
# class `org.apache.hadoop.fs.MultipartUploader`
# interface `org.apache.hadoop.fs.MultipartUploader`
<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
The abstract `MultipartUploader` class is the original class to upload a file
The `MultipartUploader` can upload a file
using multiple parts to Hadoop-supported filesystems. The benefits of a
multipart upload is that the file can be uploaded from multiple clients or
processes in parallel and the results will not be visible to other clients until
@ -30,13 +30,12 @@ the `complete` function is called.
When implemented by an object store, uploaded data may incur storage charges,
even before it is visible in the filesystems. Users of this API must be diligent
and always perform best-effort attempts to complete or abort the upload.
The `abortUploadsUnderPath(path)` operation can help here.
## Invariants
All the requirements of a valid MultipartUploader are considered implicit
All the requirements of a valid `MultipartUploader` are considered implicit
econditions and postconditions:
all operations on a valid MultipartUploader MUST result in a new
MultipartUploader that is also valid.
The operations of a single multipart upload may take place across different
instance of a multipart uploader, across different processes and hosts.
@ -45,16 +44,28 @@ It is therefore a requirement that:
1. All state needed to upload a part, complete an upload or abort an upload
must be contained within or retrievable from an upload handle.
1. If an upload handle is marshalled to another process, then, if the
receiving process has the correct permissions, it may participate in the
upload, by uploading one or more parts, by completing an upload, and/or by
aborting the upload.
1. That handle MUST be serializable; it MUST be deserializable to different
processes executing the exact same version of Hadoop.
1. different hosts/processes MAY upload different parts, sequentially or
simultaneously. The order in which they are uploaded to the filesystem
MUST NOT constrain the order in which the data is stored in the final file.
1. An upload MAY be completed on a different instance than any which uploaded
parts.
1. The output of an upload MUST NOT be visible at the final destination
until the upload may complete.
1. It is not an error if a single multipart uploader instance initiates
or completes multiple uploads files to the same destination sequentially,
irrespective of whether or not the store supports concurrent uploads.
## Concurrency
Multiple processes may upload parts of a multipart upload simultaneously.
If a call is made to `initialize(path)` to a destination where an active
If a call is made to `startUpload(path)` to a destination where an active
upload is in progress, implementations MUST perform one of the two operations.
* Reject the call as a duplicate.
@ -70,9 +81,17 @@ the in-progress upload, if it has not completed, must not be included in
the final file, in whole or in part. Implementations SHOULD raise an error
in the `putPart()` operation.
# Serialization Compatibility
Users MUST NOT expect that serialized PathHandle versions are compatible across
* different multipart uploader implementations.
* different versions of the same implementation.
That is: all clients MUST use the exact same version of Hadoop.
## Model
A File System which supports Multipart Uploads extends the existing model
A FileSystem/FileContext which supports Multipart Uploads extends the existing model
`(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)`
`Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`.
@ -112,11 +131,40 @@ However, if Part Handles are rapidly recycled, there is a risk that the nominall
idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a
successor operation which used the same Upload Handle.
## Asynchronous API
All operations return `CompletableFuture<>` types which must be
subsequently evaluated to get their return values.
1. The execution of the operation MAY be a blocking operation in on the call thread.
1. If not, it SHALL be executed in a separate thread and MUST complete by the time the
future evaluation returns.
1. Some/All preconditions MAY be evaluated at the time of initial invocation,
1. All those which are not evaluated at that time, MUST Be evaluated during the execution
of the future.
What this means is that when an implementation interacts with a fast file system/store all preconditions
including the existence of files MAY be evaluated early, whereas and implementation interacting with a
remote object store whose probes are slow MAY verify preconditions in the asynchronous phase -especially
those which interact with the remote store.
Java CompletableFutures do not work well with checked exceptions. The Hadoop codease is still evolving the
details of the exception handling here, as more use is made of the asynchronous APIs. Assume that any
precondition failure which declares that an `IOException` MUST be raised may have that operation wrapped in a
`RuntimeException` of some form if evaluated in the future; this also holds for any other `IOException`
raised during the operations.
### `close()`
Applications MUST call `close()` after using an uploader; this is so it may release other
objects, update statistics, etc.
## State Changing Operations
### `UploadHandle initialize(Path path)`
### `CompletableFuture<UploadHandle> startUpload(Path)`
Initialized a Multipart Upload, returning an upload handle for use in
Starts a Multipart Upload, ultimately returning an `UploadHandle` for use in
subsequent operations.
#### Preconditions
@ -128,17 +176,15 @@ if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOE
```
If a filesystem does not support concurrent uploads to a destination,
then the following precondition is added
then the following precondition is added:
```python
if path in values(FS.Uploads) raise PathExistsException, IOException
```
#### Postconditions
The outcome of this operation is that the filesystem state is updated with a new
Once the initialization operation completes, the filesystem state is updated with a new
active upload, with a new handle, this handle being returned to the caller.
```python
@ -147,9 +193,10 @@ FS' = FS where FS'.Uploads(handle') == {}
result = handle'
```
### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)`
### `CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)`
Upload a part for the multipart upload.
Upload a part for the specific multipart upload, eventually being returned an opaque part handle
represting this part of the specified upload.
#### Preconditions
@ -170,10 +217,12 @@ FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
result = partHandle'
```
The data is stored in the filesystem, pending completion.
The data is stored in the filesystem, pending completion. It MUST NOT be visible at the destination path.
It MAY be visible in a temporary path somewhere in the file system;
This is implementation-specific and MUST NOT be relied upon.
### `PathHandle complete(Path path, Map<Integer, PartHandle> parts, UploadHandle multipartUploadId)`
### ` CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)`
Complete the multipart upload.
@ -188,11 +237,23 @@ uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
FS.Uploads(uploadHandle).path == path
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
parts.size() > 0
forall k in keys(parts): k > 0
forall k in keys(parts):
not exists(k2 in keys(parts)) where (parts[k] == parts[k2])
```
If there are handles in the MPU which aren't included in the map, then the omitted
parts will not be a part of the resulting file. It is up to the implementation
of the MultipartUploader to make sure the leftover parts are cleaned up.
All keys MUST be greater than zero, and there MUST not be any duplicate
references to the same parthandle.
These validations MAY be performed at any point during the operation.
After a failure, there is no guarantee that a `complete()` call for this
upload with a valid map of paths will complete.
Callers SHOULD invoke `abort()` after any such failure to ensure cleanup.
if `putPart()` operations For this `uploadHandle` were performed But whose
`PathHandle` Handles were not included in this request -the omitted
parts SHALL NOT be a part of the resulting file.
The MultipartUploader MUST clean up any such outstanding entries.
In the case of backing stores that support directories (local filesystem, HDFS,
etc), if, at the point of completion, there is now a directory at the
@ -206,14 +267,14 @@ exists(FS', path') and result = PathHandle(path')
FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
```
The PathHandle is returned by the complete operation so subsequent operations
The `PathHandle` is returned by the complete operation so subsequent operations
will be able to identify that the data has not changed in the meantime.
The order of parts in the uploaded by file is that of the natural order of
parts: part 1 is ahead of part 2, etc.
parts in the map: part 1 is ahead of part 2, etc.
### `void abort(Path path, UploadHandle multipartUploadId)`
### `CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)`
Abort a multipart upload. The handle becomes invalid and not subject to reuse.
@ -233,3 +294,23 @@ FS' = FS where not uploadHandle in keys(FS'.uploads)
```
A subsequent call to `abort()` with the same handle will fail, unless
the handle has been recycled.
### `CompletableFuture<Integer> abortUploadsUnderPath(Path path)`
Perform a best-effort cleanup of all uploads under a path.
returns a future which resolves to.
-1 if unsuppported
>= 0 if supported
Because it is best effort a strict postcondition isn't possible.
The ideal postcondition is all uploads under the path are aborted,
and the count is the number of uploads aborted:
```python
FS'.uploads forall upload in FS.uploads:
not isDescendant(FS, path, upload.path)
return len(forall upload in FS.uploads:
isDescendant(FS, path, upload.path))
```

View File

@ -137,6 +137,12 @@ public class TestFilterFileSystem {
void setQuota(Path f, long namespaceQuota, long storagespaceQuota);
void setQuotaByStorageType(Path f, StorageType type, long quota);
StorageStatistics getStorageStatistics();
/*
Not passed through as the inner implementation will miss features
of the filter such as checksums.
*/
MultipartUploaderBuilder createMultipartUploader(Path basePath);
}
@Test
@ -278,6 +284,23 @@ public class TestFilterFileSystem {
verify(mockFs).rename(eq(src), eq(dst), eq(opt));
}
/**
* Verify that filterFS always returns false, even if local/rawlocal
* ever implement multipart uploads.
*/
@Test
public void testFilterPathCapabilites() throws Exception {
try (FilterFileSystem flfs = new FilterLocalFileSystem()) {
flfs.initialize(URI.create("filter:/"), conf);
Path src = new Path("/src");
assertFalse(
"hasPathCapability(FS_MULTIPART_UPLOADER) should have failed for "
+ flfs,
flfs.hasPathCapability(src,
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
}
}
private void checkInit(FilterFileSystem fs, boolean expectInit)
throws Exception {
URI uri = URI.create("filter:/");

View File

@ -248,6 +248,9 @@ public class TestHarFileSystem {
CompletableFuture<FSDataInputStream> openFileWithOptions(
Path path,
OpenFileParameters parameters) throws IOException;
MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException;
}
@Test

View File

@ -26,8 +26,10 @@ import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import com.google.common.base.Charsets;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
@ -35,22 +37,31 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests of multipart uploads.
* <p></p>
* <i>Note</i>: some of the tests get a random uploader between
* the two which are available. If tests fail intermittently,
* it may be because different uploaders are being selected.
*/
public abstract class AbstractContractMultipartUploaderTest extends
AbstractFSContractTestBase {
@ -63,36 +74,44 @@ public abstract class AbstractContractMultipartUploaderTest extends
*/
protected static final int SMALL_FILE = 100;
private MultipartUploader mpu;
private MultipartUploader mpu2;
protected static final int CONSISTENCY_INTERVAL = 1000;
private MultipartUploader uploader0;
private MultipartUploader uploader1;
private final Random random = new Random();
private UploadHandle activeUpload;
private Path activeUploadPath;
protected String getMethodName() {
return methodName.getMethodName();
}
@Override
public void setup() throws Exception {
super.setup();
Configuration conf = getContract().getConf();
mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
final FileSystem fs = getFileSystem();
Path testPath = getContract().getTestPath();
uploader0 = fs.createMultipartUploader(testPath).build();
uploader1 = fs.createMultipartUploader(testPath).build();
}
@Override
public void teardown() throws Exception {
if (mpu!= null && activeUpload != null) {
MultipartUploader uploader = getUploader(1);
if (uploader != null) {
if (activeUpload != null) {
abortUploadQuietly(activeUpload, activeUploadPath);
}
try {
mpu.abort(activeUploadPath, activeUpload);
} catch (FileNotFoundException ignored) {
/* this is fine */
// round off with an abort of all uploads
Path teardown = getContract().getTestPath();
LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
CompletableFuture<Integer> f
= uploader.abortUploadsUnderPath(teardown);
f.get();
} catch (Exception e) {
LOG.info("in teardown", e);
LOG.warn("Exeception in teardown", e);
}
}
cleanupWithLogger(LOG, mpu, mpu2);
cleanupWithLogger(LOG, uploader0, uploader1);
super.teardown();
}
@ -192,16 +211,16 @@ public abstract class AbstractContractMultipartUploaderTest extends
* @param index index of upload
* @return an uploader
*/
protected MultipartUploader mpu(int index) {
return (index % 2 == 0) ? mpu : mpu2;
protected MultipartUploader getUploader(int index) {
return (index % 2 == 0) ? uploader0 : uploader1;
}
/**
* Pick a multipart uploader at random.
* @return an uploader
*/
protected MultipartUploader randomMpu() {
return mpu(random.nextInt(10));
protected MultipartUploader getRandomUploader() {
return getUploader(random.nextInt(10));
}
/**
@ -211,39 +230,71 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testSingleUpload() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
int size = SMALL_FILE;
byte[] payload = generatePayload(1, size);
origDigest.update(payload);
// use a single uploader
// note: the same is used here as it found a bug in the S3Guard
// DDB bulk operation state upload -the previous operation had
// added an entry to the ongoing state; this second call
// was interpreted as an inconsistent write.
MultipartUploader completer = uploader0;
// and upload with uploader 1 to validate cross-uploader uploads
PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
partHandles.put(1, partHandle);
PathHandle fd = completeUpload(file, uploadHandle, partHandles,
origDigest,
size);
PathHandle fd = complete(completer, uploadHandle, file,
partHandles);
validateUpload(file, origDigest, size);
// verify that if the implementation processes data immediately
// then a second attempt at the upload will fail.
if (finalizeConsumesUploadIdImmediately()) {
intercept(FileNotFoundException.class,
() -> mpu.complete(file, partHandles, uploadHandle));
() -> complete(completer, uploadHandle, file, partHandles));
} else {
PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
// otherwise, the same or other uploader can try again.
PathHandle fd2 = complete(completer, uploadHandle, file, partHandles);
assertArrayEquals("Path handles differ", fd.toByteArray(),
fd2.toByteArray());
}
}
/**
* Initialize an upload.
* Complete IO for a specific uploader; await the response.
* @param uploader uploader
* @param uploadHandle Identifier
* @param file Target path for upload
* @param partHandles handles map of part number to part handle
* @return unique PathHandle identifier for the uploaded file.
*/
protected PathHandle complete(
final MultipartUploader uploader,
final UploadHandle uploadHandle,
final Path file,
final Map<Integer, PartHandle> partHandles)
throws IOException {
try (DurationInfo d =
new DurationInfo(LOG, "Complete upload to %s", file)) {
return awaitFuture(
uploader.complete(uploadHandle, file, partHandles));
}
}
/**
* start an upload.
* This saves the path and upload handle as the active
* upload, for aborting in teardown
* @param dest destination
* @return the handle
* @throws IOException failure to initialize
*/
protected UploadHandle initializeUpload(final Path dest) throws IOException {
protected UploadHandle startUpload(final Path dest) throws IOException {
activeUploadPath = dest;
activeUpload = randomMpu().initialize(dest);
activeUpload = awaitFuture(getRandomUploader().startUpload(dest));
return activeUpload;
}
@ -283,12 +334,17 @@ public abstract class AbstractContractMultipartUploaderTest extends
final int index,
final byte[] payload) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PartHandle partHandle = mpu(index)
.putPart(file,
new ByteArrayInputStream(payload),
index,
uploadHandle,
payload.length);
PartHandle partHandle;
try (DurationInfo d =
new DurationInfo(LOG, "Put part %d (size %s) %s",
index,
payload.length,
file)) {
partHandle = awaitFuture(getUploader(index)
.putPart(uploadHandle, index, file,
new ByteArrayInputStream(payload),
payload.length));
}
timer.end("Uploaded part %s", index);
LOG.info("Upload bandwidth {} MB/s",
timer.bandwidthDescription(payload.length));
@ -296,7 +352,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
}
/**
* Complete an upload with the active MPU instance.
* Complete an upload with a random uploader.
* @param file destination
* @param uploadHandle handle
* @param partHandles map of handles
@ -312,36 +368,64 @@ public abstract class AbstractContractMultipartUploaderTest extends
final int expectedLength) throws IOException {
PathHandle fd = complete(file, uploadHandle, partHandles);
FileStatus status = verifyPathExists(getFileSystem(),
"Completed file", file);
assertEquals("length of " + status,
expectedLength, status.getLen());
validateUpload(file, origDigest, expectedLength);
return fd;
}
/**
* Complete an upload with a random uploader.
* @param file destination
* @param origDigest digest of source data (may be null)
* @param expectedLength expected length of result.
* @throws IOException IO failure
*/
private void validateUpload(final Path file,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
verifyPathExists(getFileSystem(),
"Completed file", file);
verifyFileLength(file, expectedLength);
if (origDigest != null) {
verifyContents(file, origDigest, expectedLength);
}
return fd;
}
/**
* Verify the contents of a file.
* @param file path
* @param origDigest digest
* @param expectedLength expected length (for logging B/W)
* @param expectedLength expected length (for logging download bandwidth)
* @throws IOException IO failure
*/
protected void verifyContents(final Path file,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
assertArrayEquals("digest of source and " + file
+ " differ",
origDigest.digest(), digest(file));
Assertions.assertThat(digest(file))
.describedAs("digest of uploaded file %s", file)
.isEqualTo(origDigest.digest());
timer2.end("Completed digest", file);
LOG.info("Download bandwidth {} MB/s",
timer2.bandwidthDescription(expectedLength));
}
/**
* Verify the length of a file.
* @param file path
* @param expectedLength expected length
* @throws IOException IO failure
*/
private void verifyFileLength(final Path file, final long expectedLength)
throws IOException {
FileStatus st = getFileSystem().getFileStatus(file);
Assertions.assertThat(st)
.describedAs("Uploaded file %s", st)
.matches(FileStatus::isFile)
.extracting(FileStatus::getLen)
.isEqualTo(expectedLength);
}
/**
* Perform the inner complete without verification.
* @param file destination path
@ -353,21 +437,37 @@ public abstract class AbstractContractMultipartUploaderTest extends
private PathHandle complete(final Path file,
final UploadHandle uploadHandle,
final Map<Integer, PartHandle> partHandles) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
timer.end("Completed upload to %s", file);
return fd;
return complete(getRandomUploader(), uploadHandle, file,
partHandles);
}
/**
* Abort an upload.
* @param file path
* @param uploadHandle handle
* @param file path
* @throws IOException failure
*/
private void abortUpload(final Path file, UploadHandle uploadHandle)
private void abortUpload(UploadHandle uploadHandle,
final Path file)
throws IOException {
randomMpu().abort(file, uploadHandle);
try (DurationInfo d =
new DurationInfo(LOG, "Abort upload to %s", file)) {
awaitFuture(getRandomUploader().abort(uploadHandle, file));
}
}
/**
* Abort an upload; swallows exceptions.
* @param uploadHandle handle
* @param file path
*/
private void abortUploadQuietly(UploadHandle uploadHandle, Path file) {
try {
abortUpload(uploadHandle, file);
} catch (FileNotFoundException ignored) {
} catch (Exception e) {
LOG.info("aborting {}: {}", file, e.toString());
}
}
/**
@ -377,10 +477,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testMultipartUpload() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
origDigest);
@ -400,16 +500,16 @@ public abstract class AbstractContractMultipartUploaderTest extends
FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload");
try (MultipartUploader uploader =
MultipartUploaderFactory.get(fs, null)) {
UploadHandle uploadHandle = uploader.initialize(file);
fs.createMultipartUploader(file).build()) {
UploadHandle uploadHandle = uploader.startUpload(file).get();
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
byte[] payload = new byte[0];
origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
payload.length);
PartHandle partHandle = awaitFuture(
uploader.putPart(uploadHandle, 1, file, is, payload.length));
partHandles.put(1, partHandle);
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
}
@ -422,7 +522,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testUploadEmptyBlock() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
completeUpload(file, uploadHandle, partHandles, null, 0);
@ -435,10 +535,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testMultipartUploadReverseOrder() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
byte[] payload = generatePayload(i);
origDigest.update(payload);
@ -459,7 +559,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
throws Exception {
describe("Upload in reverse order and the part numbers are not contiguous");
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = 2 * getTestPayloadCount();
for (int i = 2; i <= payloadCount; i += 2) {
@ -482,22 +582,22 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testMultipartUploadAbort() throws Exception {
describe("Upload and then abort it before completing");
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
int end = 10;
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = 12; i > 10; i--) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
}
abortUpload(file, uploadHandle);
abortUpload(uploadHandle, file);
String contents = "ThisIsPart49\n";
int len = contents.getBytes(Charsets.UTF_8).length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
intercept(IOException.class,
() -> mpu.putPart(file, is, 49, uploadHandle, len));
() -> awaitFuture(
uploader0.putPart(uploadHandle, 49, file, is, len)));
intercept(IOException.class,
() -> mpu.complete(file, partHandles, uploadHandle));
() -> complete(uploader0, uploadHandle, file, partHandles));
assertPathDoesNotExist("Uploaded file should not exist", file);
@ -505,9 +605,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
// consumed by finalization operations (complete, abort).
if (finalizeConsumesUploadIdImmediately()) {
intercept(FileNotFoundException.class,
() -> abortUpload(file, uploadHandle));
() -> abortUpload(uploadHandle, file));
} else {
abortUpload(file, uploadHandle);
abortUpload(uploadHandle, file);
}
}
@ -519,31 +619,55 @@ public abstract class AbstractContractMultipartUploaderTest extends
Path file = methodPath();
ByteBuffer byteBuffer = ByteBuffer.wrap(
"invalid-handle".getBytes(Charsets.UTF_8));
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
intercept(FileNotFoundException.class,
() -> abortUpload(file, uploadHandle));
() -> abortUpload(BBUploadHandle.from(byteBuffer), file));
}
/**
* Trying to abort with a handle of size 0 must fail.
* Trying to abort an upload with no data does not create a file.
*/
@Test
public void testAbortEmptyUpload() throws Exception {
describe("initialize upload and abort before uploading data");
Path file = methodPath();
abortUpload(file, initializeUpload(file));
abortUpload(startUpload(file), file);
assertPathDoesNotExist("Uploaded file should not exist", file);
}
/**
* Trying to abort an upload with no data does not create a file.
*/
@Test
public void testAbortAllPendingUploads() throws Exception {
describe("initialize upload and abort the pending upload");
Path path = methodPath();
Path file = new Path(path, "child");
UploadHandle upload = startUpload(file);
try {
CompletableFuture<Integer> oF
= getRandomUploader().abortUploadsUnderPath(path.getParent());
int abortedUploads = awaitFuture(oF);
if (abortedUploads >= 0) {
// uploads can be aborted
Assertions.assertThat(abortedUploads)
.describedAs("Number of uploads aborted")
.isGreaterThanOrEqualTo(1);
assertPathDoesNotExist("Uploaded file should not exist", file);
}
} finally {
abortUploadQuietly(upload, file);
}
}
/**
* Trying to abort with a handle of size 0 must fail.
*/
@Test
public void testAbortEmptyUploadHandle() throws Exception {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
intercept(IllegalArgumentException.class,
() -> abortUpload(methodPath(), uploadHandle));
() -> abortUpload(BBUploadHandle.from(byteBuffer), methodPath()));
}
/**
@ -553,10 +677,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testCompleteEmptyUpload() throws Exception {
describe("Expect an empty MPU to fail, but still be abortable");
Path dest = methodPath();
UploadHandle handle = initializeUpload(dest);
UploadHandle handle = startUpload(dest);
intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, new HashMap<>(), handle));
abortUpload(dest, handle);
() -> complete(uploader0, handle, dest, new HashMap<>()));
abortUpload(handle, dest);
}
/**
@ -571,7 +695,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
byte[] payload = generatePayload(1);
InputStream is = new ByteArrayInputStream(payload);
intercept(IllegalArgumentException.class,
() -> mpu.putPart(dest, is, 1, emptyHandle, payload.length));
() -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
}
/**
@ -581,7 +705,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testCompleteEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when complete uploadID is empty");
Path dest = methodPath();
UploadHandle realHandle = initializeUpload(dest);
UploadHandle realHandle = startUpload(dest);
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
Map<Integer, PartHandle> partHandles = new HashMap<>();
@ -590,14 +714,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
partHandles.put(1, partHandle);
intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, partHandles, emptyHandle));
() -> complete(uploader0, emptyHandle, dest, partHandles));
// and, while things are setup, attempt to complete with
// a part index of 0
partHandles.clear();
partHandles.put(0, partHandle);
intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, partHandles, realHandle));
() -> complete(uploader0, realHandle, dest, partHandles));
}
/**
@ -610,7 +734,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testDirectoryInTheWay() throws Exception {
FileSystem fs = getFileSystem();
Path file = methodPath();
UploadHandle uploadHandle = initializeUpload(file);
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
int size = SMALL_FILE;
PartHandle partHandle = putPart(file, uploadHandle, 1,
@ -622,7 +746,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
() -> completeUpload(file, uploadHandle, partHandles, null,
size));
// abort should still work
abortUpload(file, uploadHandle);
abortUpload(uploadHandle, file);
}
@Test
@ -630,46 +754,44 @@ public abstract class AbstractContractMultipartUploaderTest extends
// if the FS doesn't support concurrent uploads, this test is
// required to fail during the second initialization.
final boolean concurrent = supportsConcurrentUploadsToSamePath();
boolean concurrent = supportsConcurrentUploadsToSamePath();
describe("testing concurrent uploads, MPU support for this is "
+ concurrent);
final FileSystem fs = getFileSystem();
final Path file = methodPath();
final int size1 = SMALL_FILE;
final int partId1 = 1;
final byte[] payload1 = generatePayload(partId1, size1);
final MessageDigest digest1 = DigestUtils.getMd5Digest();
Path file = methodPath();
int size1 = SMALL_FILE;
int partId1 = 1;
byte[] payload1 = generatePayload(partId1, size1);
MessageDigest digest1 = DigestUtils.getMd5Digest();
digest1.update(payload1);
final UploadHandle upload1 = initializeUpload(file);
final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
UploadHandle upload1 = startUpload(file);
Map<Integer, PartHandle> partHandles1 = new HashMap<>();
// initiate part 2
// by using a different size, it's straightforward to see which
// version is visible, before reading/digesting the contents
final int size2 = size1 * 2;
final int partId2 = 2;
final byte[] payload2 = generatePayload(partId1, size2);
final MessageDigest digest2 = DigestUtils.getMd5Digest();
int size2 = size1 * 2;
int partId2 = 2;
byte[] payload2 = generatePayload(partId1, size2);
MessageDigest digest2 = DigestUtils.getMd5Digest();
digest2.update(payload2);
final UploadHandle upload2;
UploadHandle upload2;
try {
upload2 = initializeUpload(file);
upload2 = startUpload(file);
Assume.assumeTrue(
"The Filesystem is unexpectedly supporting concurrent uploads",
concurrent);
} catch (IOException e) {
if (!concurrent) {
// this is expected, so end the test
LOG.debug("Expected exception raised on concurrent uploads {}", e);
LOG.debug("Expected exception raised on concurrent uploads", e);
return;
} else {
throw e;
}
}
final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
Map<Integer, PartHandle> partHandles2 = new HashMap<>();
assertNotEquals("Upload handles match", upload1, upload2);
@ -686,13 +808,21 @@ public abstract class AbstractContractMultipartUploaderTest extends
// now upload part 2.
complete(file, upload2, partHandles2);
// and await the visible length to match
eventually(timeToBecomeConsistentMillis(), 500,
() -> {
FileStatus status = fs.getFileStatus(file);
assertEquals("File length in " + status,
size2, status.getLen());
});
eventually(timeToBecomeConsistentMillis(),
() -> verifyFileLength(file, size2),
new LambdaTestUtils.ProportionalRetryInterval(
CONSISTENCY_INTERVAL,
timeToBecomeConsistentMillis()));
verifyContents(file, digest2, size2);
}
@Test
public void testPathCapabilities() throws Throwable {
FileSystem fs = getFileSystem();
Assertions.assertThat(fs.hasPathCapability(getContract().getTestPath(),
CommonPathCapabilities.FS_MULTIPART_UPLOADER))
.describedAs("fs %s, lacks multipart upload capability", fs)
.isTrue();
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.localfs;
import org.junit.Assume;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Test the FileSystemMultipartUploader on local file system.
*/
public class TestLocalFSContractMultipartUploader
extends AbstractContractMultipartUploaderTest {
@Override
public void setup() throws Exception {
Assume.assumeTrue("Skipping until HDFS-13934", false);
super.setup();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
/**
* There is no real need to upload any particular size.
* @return 1 kilobyte
*/
@Override
protected int partSizeInBytes() {
return 1024;
}
@Override
protected boolean finalizeConsumesUploadIdImmediately() {
return true;
}
@Override
protected boolean supportsConcurrentUploadsToSamePath() {
return true;
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.InvalidPathHandleException;
import org.apache.hadoop.fs.PartialListing;
import org.apache.hadoop.fs.MultipartUploaderBuilder;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
@ -66,6 +67,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
@ -3615,4 +3617,10 @@ public class DistributedFileSystem extends FileSystem
return super.hasPathCapability(p, capability);
}
@Override
public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return new FileSystemMultipartUploaderBuilder(this, basePath);
}
}

View File

@ -47,6 +47,7 @@ public final class DfsPathCapabilities {
case CommonPathCapabilities.FS_CHECKSUMS:
case CommonPathCapabilities.FS_CONCAT:
case CommonPathCapabilities.FS_LIST_CORRUPT_FILE_BLOCKS:
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
case CommonPathCapabilities.FS_PATHHANDLES:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_SNAPSHOTS:

View File

@ -76,10 +76,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.MultipartUploaderBuilder;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@ -2125,6 +2127,12 @@ public class WebHdfsFileSystem extends FileSystem
return super.hasPathCapability(p, capability);
}
@Override
public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return new FileSystemMultipartUploaderBuilder(this, basePath);
}
/**
* This class is used for opening, reading, and seeking files while using the
* WebHdfsFileSystem. This class will invoke the retry policy when performing

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.hdfs.DFSMultipartUploaderFactory

View File

@ -108,8 +108,11 @@ import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.io.IOUtils;
@ -4493,6 +4496,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
ETAG_CHECKSUM_ENABLED_DEFAULT);
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
return true;
default:
return super.hasPathCapability(p, capability);
}
@ -4722,6 +4728,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return result;
}
@Override
public S3AMultipartUploaderBuilder createMultipartUploader(
final Path basePath)
throws IOException {
StoreContext ctx = createStoreContext();
return new S3AMultipartUploaderBuilder(this,
getWriteOperationHelper(),
ctx,
basePath,
new S3AMultipartUploaderStatisticsImpl(ctx::incrementStatistic));
}
/**
* Build an immutable store context.
* If called while the FS is being initialized,
@ -4731,24 +4749,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContext(
getUri(),
getBucket(),
getConf(),
getUsername(),
owner,
boundedThreadPool,
executorCapacity,
invoker,
getInstrumentation(),
getStorageStatistics(),
getInputPolicy(),
changeDetectionPolicy,
enableMultiObjectsDelete,
metadataStore,
useListV1,
new ContextAccessorsImpl(),
getTtlTimeProvider());
return new StoreContextBuilder().setFsURI(getUri())
.setBucket(getBucket())
.setConfiguration(getConf())
.setUsername(getUsername())
.setOwner(owner)
.setExecutor(boundedThreadPool)
.setExecutorCapacity(executorCapacity)
.setInvoker(invoker)
.setInstrumentation(getInstrumentation())
.setStorageStatistics(getStorageStatistics())
.setInputPolicy(getInputPolicy())
.setChangeDetectionPolicy(changeDetectionPolicy)
.setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
.setMetadataStore(metadataStore)
.setUseListV1(useListV1)
.setContextAccessors(new ContextAccessorsImpl())
.setTimeProvider(getTtlTimeProvider())
.build();
}
/**
@ -4776,5 +4794,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public String getBucketLocation() throws IOException {
return S3AFileSystem.this.getBucketLocation();
}
@Override
public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}
}
}

View File

@ -193,7 +193,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
S3GUARD_METADATASTORE_AUTHORITATIVE_DIRECTORIES_UPDATED,
STORE_IO_THROTTLED,
DELEGATION_TOKENS_ISSUED,
FILES_DELETE_REJECTED
FILES_DELETE_REJECTED,
MULTIPART_INSTANTIATED,
MULTIPART_PART_PUT,
MULTIPART_PART_PUT_BYTES,
MULTIPART_UPLOAD_ABORTED,
MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED,
MULTIPART_UPLOAD_COMPLETED,
MULTIPART_UPLOAD_STARTED
};
private static final Statistic[] GAUGES_TO_CREATE = {

View File

@ -1,216 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBPartHandle;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
/**
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
* upload mechanism.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class S3AMultipartUploader extends MultipartUploader {
private final S3AFileSystem s3a;
/** Header for Parts: {@value}. */
public static final String HEADER = "S3A-part01";
public S3AMultipartUploader(FileSystem fs, Configuration conf) {
Preconditions.checkArgument(fs instanceof S3AFileSystem,
"Wrong filesystem: expected S3A but got %s", fs);
s3a = (S3AFileSystem) fs;
}
@Override
public UploadHandle initialize(Path filePath) throws IOException {
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
String uploadId = writeHelper.initiateMultiPartUpload(key);
return BBUploadHandle.from(ByteBuffer.wrap(
uploadId.getBytes(Charsets.UTF_8)));
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException {
checkPutArguments(filePath, inputStream, partNumber, uploadId,
lengthInBytes);
byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
String key = s3a.pathToKey(filePath);
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
UploadPartRequest request = writeHelper.newUploadPartRequest(key,
uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L);
UploadPartResult result = writeHelper.uploadPart(request);
String eTag = result.getETag();
return BBPartHandle.from(
ByteBuffer.wrap(
buildPartHandlePayload(eTag, lengthInBytes)));
}
@Override
public PathHandle complete(Path filePath,
Map<Integer, PartHandle> handleMap,
UploadHandle uploadId)
throws IOException {
byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
checkPartHandles(handleMap);
List<Map.Entry<Integer, PartHandle>> handles =
new ArrayList<>(handleMap.entrySet());
handles.sort(Comparator.comparingInt(Map.Entry::getKey));
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
ArrayList<PartETag> eTags = new ArrayList<>();
eTags.ensureCapacity(handles.size());
long totalLength = 0;
for (Map.Entry<Integer, PartHandle> handle : handles) {
byte[] payload = handle.getValue().toByteArray();
Pair<Long, String> result = parsePartHandlePayload(payload);
totalLength += result.getLeft();
eTags.add(new PartETag(handle.getKey(), result.getRight()));
}
AtomicInteger errorCount = new AtomicInteger(0);
CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
key, uploadIdStr, eTags, totalLength, errorCount);
byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
return (PathHandle) () -> ByteBuffer.wrap(eTag);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) throws IOException {
final byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
writeHelper.abortMultipartCommit(key, uploadIdString);
}
/**
* Factory for creating MultipartUploader objects for s3a:// FileSystems.
*/
public static class Factory extends MultipartUploaderFactory {
@Override
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (FS_S3A.equals(fs.getScheme())) {
return new S3AMultipartUploader(fs, conf);
}
return null;
}
}
/**
* Build the payload for marshalling.
* @param eTag upload etag
* @param len length
* @return a byte array to marshall.
* @throws IOException error writing the payload
*/
@VisibleForTesting
static byte[] buildPartHandlePayload(String eTag, long len)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
"Empty etag");
Preconditions.checkArgument(len >= 0,
"Invalid length");
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try(DataOutputStream output = new DataOutputStream(bytes)) {
output.writeUTF(HEADER);
output.writeLong(len);
output.writeUTF(eTag);
}
return bytes.toByteArray();
}
/**
* Parse the payload marshalled as a part handle.
* @param data handle data
* @return the length and etag
* @throws IOException error reading the payload
*/
@VisibleForTesting
static Pair<Long, String> parsePartHandlePayload(byte[] data)
throws IOException {
try(DataInputStream input =
new DataInputStream(new ByteArrayInputStream(data))) {
final String header = input.readUTF();
if (!HEADER.equals(header)) {
throw new IOException("Wrong header string: \"" + header + "\"");
}
final long len = input.readLong();
final String etag = input.readUTF();
if (len < 0) {
throw new IOException("Negative length");
}
return Pair.of(len, etag);
}
}
}

View File

@ -234,7 +234,29 @@ public enum Statistic {
"Rate of S3 request throttling"),
DELEGATION_TOKENS_ISSUED("delegation_tokens_issued",
"Number of delegation tokens issued");
"Number of delegation tokens issued"),
MULTIPART_INSTANTIATED(
"multipart_instantiated",
"Multipart Uploader Instantiated"),
MULTIPART_PART_PUT(
"multipart_part_put",
"Multipart Part Put Operation"),
MULTIPART_PART_PUT_BYTES(
"multipart_part_put_bytes",
"Multipart Part Put Bytes"),
MULTIPART_UPLOAD_ABORTED(
"multipart_upload_aborted",
"Multipart Upload Aborted"),
MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED(
"multipart_upload_abort_under_path_invoked",
"Multipart Upload Abort Udner Path Invoked"),
MULTIPART_UPLOAD_COMPLETED(
"multipart_upload_completed",
"Multipart Upload Completed"),
MULTIPART_UPLOAD_STARTED(
"multipart_upload_started",
"Multipart Upload Started");
private static final Map<String, Statistic> SYMBOL_MAP =
new HashMap<>(Statistic.values().length);

View File

@ -87,7 +87,7 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class WriteOperationHelper {
public class WriteOperationHelper implements WriteOperations {
private static final Logger LOG =
LoggerFactory.getLogger(WriteOperationHelper.class);
@ -254,11 +254,11 @@ public class WriteOperationHelper {
Retried retrying,
@Nullable BulkOperationState operationState) throws IOException {
if (partETags.isEmpty()) {
throw new IOException(
"No upload parts in multipart upload to " + destKey);
throw new PathIOException(destKey,
"No upload parts in multipart upload");
}
CompleteMultipartUploadResult uploadResult =
invoker.retry("Completing multipart commit", destKey,
invoker.retry("Completing multipart upload", destKey,
true,
retrying,
() -> {
@ -560,8 +560,20 @@ public class WriteOperationHelper {
*/
public BulkOperationState initiateCommitOperation(
Path path) throws IOException {
return initiateOperation(path, BulkOperationState.OperationType.Commit);
}
/**
* Initiate a commit operation through any metastore.
* @param path path under which the writes will all take place.
* @param operationType operation to initiate
* @return an possibly null operation state from the metastore.
* @throws IOException failure to instantiate.
*/
public BulkOperationState initiateOperation(final Path path,
final BulkOperationState.OperationType operationType) throws IOException {
return S3Guard.initiateBulkWrite(owner.getMetadataStore(),
BulkOperationState.OperationType.Commit, path);
operationType, path);
}
/**

View File

@ -0,0 +1,335 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.SelectObjectContentResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
/**
* Operations to update the store.
* This is effectively a private internal API for classes used as part
* of the S3A implementation.
* New extension points SHOULD use this interface -provided there
* is no plan to backport to previous versions. In those situations,
* use `WriteOperationHelper` directly.
* @since Hadoop 3.3.0
*/
public interface WriteOperations {
/**
* Execute a function with retry processing.
* @param action action to execute (used in error messages)
* @param path path of work (used in error messages)
* @param idempotent does the operation have semantics
* which mean that it can be retried even if was already executed?
* @param operation operation to execute
* @param <T> type of return value
* @return the result of the call
* @throws IOException any IOE raised, or translated exception
*/
<T> T retry(String action,
String path,
boolean idempotent,
Invoker.Operation<T> operation)
throws IOException;
/**
* Create a {@link PutObjectRequest} request against the specific key.
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @return the request
*/
PutObjectRequest createPutObjectRequest(String destKey,
InputStream inputStream, long length);
/**
* Create a {@link PutObjectRequest} request to upload a file.
* @param dest key to PUT to.
* @param sourceFile source file
* @return the request
*/
PutObjectRequest createPutObjectRequest(String dest,
File sourceFile);
/**
* Callback on a successful write.
* @param length length of the write
*/
void writeSuccessful(long length);
/**
* Callback on a write failure.
* @param ex Any exception raised which triggered the failure.
*/
void writeFailed(Exception ex);
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
* @param length size, if known. Use -1 for not known
* @return a new metadata instance
*/
ObjectMetadata newObjectMetadata(long length);
/**
* Start the multipart upload process.
* Retry policy: retrying, translated.
* @param destKey destination of upload
* @return the upload result containing the ID
* @throws IOException IO problem
*/
@Retries.RetryTranslated
String initiateMultiPartUpload(String destKey) throws IOException;
/**
* This completes a multipart upload to the destination key via
* {@code finalizeMultipartUpload()}.
* Retry policy: retrying, translated.
* Retries increment the {@code errorCount} counter.
* @param destKey destination
* @param uploadId multipart operation Id
* @param partETags list of partial uploads
* @param length length of the upload
* @param errorCount a counter incremented by 1 on every error; for
* use in statistics
* @return the result of the operation.
* @throws IOException if problems arose which could not be retried, or
* the retry count was exceeded
*/
@Retries.RetryTranslated
CompleteMultipartUploadResult completeMPUwithRetries(
String destKey,
String uploadId,
List<PartETag> partETags,
long length,
AtomicInteger errorCount)
throws IOException;
/**
* Abort a multipart upload operation.
* @param destKey destination key of the upload
* @param uploadId multipart operation Id
* @param retrying callback invoked on every retry
* @throws IOException failure to abort
* @throws FileNotFoundException if the abort ID is unknown
*/
@Retries.RetryTranslated
void abortMultipartUpload(String destKey, String uploadId,
Invoker.Retried retrying)
throws IOException;
/**
* Abort a multipart commit operation.
* @param upload upload to abort.
* @throws IOException on problems.
*/
@Retries.RetryTranslated
void abortMultipartUpload(MultipartUpload upload)
throws IOException;
/**
* Abort multipart uploads under a path: limited to the first
* few hundred.
* @param prefix prefix for uploads to abort
* @return a count of aborts
* @throws IOException trouble; FileNotFoundExceptions are swallowed.
*/
@Retries.RetryTranslated
int abortMultipartUploadsUnderPath(String prefix)
throws IOException;
/**
* Abort a multipart commit operation.
* @param destKey destination key of ongoing operation
* @param uploadId multipart operation Id
* @throws IOException on problems.
* @throws FileNotFoundException if the abort ID is unknown
*/
@Retries.RetryTranslated
void abortMultipartCommit(String destKey, String uploadId)
throws IOException;
/**
* Create and initialize a part request of a multipart upload.
* Exactly one of: {@code uploadStream} or {@code sourceFile}
* must be specified.
* A subset of the file may be posted, by providing the starting point
* in {@code offset} and a length of block in {@code size} equal to
* or less than the remaining bytes.
* @param destKey destination key of ongoing operation
* @param uploadId ID of ongoing upload
* @param partNumber current part number of the upload
* @param size amount of data
* @param uploadStream source of data to upload
* @param sourceFile optional source file.
* @param offset offset in file to start reading.
* @return the request.
* @throws IllegalArgumentException if the parameters are invalid -including
* @throws PathIOException if the part number is out of range.
*/
UploadPartRequest newUploadPartRequest(
String destKey,
String uploadId,
int partNumber,
int size,
InputStream uploadStream,
File sourceFile,
Long offset) throws PathIOException;
/**
* PUT an object directly (i.e. not via the transfer manager).
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
* @param putObjectRequest the request
* @return the upload initiated
* @throws IOException on problems
*/
@Retries.RetryTranslated
PutObjectResult putObject(PutObjectRequest putObjectRequest)
throws IOException;
/**
* PUT an object via the transfer manager.
* @param putObjectRequest the request
* @return the result of the operation
* @throws IOException on problems
*/
@Retries.RetryTranslated
UploadResult uploadObject(PutObjectRequest putObjectRequest)
throws IOException;
/**
* Revert a commit by deleting the file.
* Relies on retry code in filesystem
* @throws IOException on problems
* @param destKey destination key
* @param operationState operational state for a bulk update
*/
@Retries.OnceTranslated
void revertCommit(String destKey,
@Nullable BulkOperationState operationState) throws IOException;
/**
* This completes a multipart upload to the destination key via
* {@code finalizeMultipartUpload()}.
* Retry policy: retrying, translated.
* Retries increment the {@code errorCount} counter.
* @param destKey destination
* @param uploadId multipart operation Id
* @param partETags list of partial uploads
* @param length length of the upload
* @param operationState operational state for a bulk update
* @return the result of the operation.
* @throws IOException if problems arose which could not be retried, or
* the retry count was exceeded
*/
@Retries.RetryTranslated
CompleteMultipartUploadResult commitUpload(
String destKey,
String uploadId,
List<PartETag> partETags,
long length,
@Nullable BulkOperationState operationState)
throws IOException;
/**
* Initiate a commit operation through any metastore.
* @param path path under which the writes will all take place.
* @return an possibly null operation state from the metastore.
* @throws IOException failure to instantiate.
*/
BulkOperationState initiateCommitOperation(
Path path) throws IOException;
/**
* Initiate a commit operation through any metastore.
* @param path path under which the writes will all take place.
* @param operationType operation to initiate
* @return an possibly null operation state from the metastore.
* @throws IOException failure to instantiate.
*/
BulkOperationState initiateOperation(Path path,
BulkOperationState.OperationType operationType) throws IOException;
/**
* Upload part of a multi-partition file.
* @param request request
* @return the result of the operation.
* @throws IOException on problems
*/
@Retries.RetryTranslated
UploadPartResult uploadPart(UploadPartRequest request)
throws IOException;
/**
* Get the configuration of this instance; essentially the owning
* filesystem configuration.
* @return the configuration.
*/
Configuration getConf();
/**
* Create a S3 Select request for the destination path.
* This does not build the query.
* @param path pre-qualified path for query
* @return the request
*/
SelectObjectContentRequest newSelectRequest(Path path);
/**
* Execute an S3 Select operation.
* On a failure, the request is only logged at debug to avoid the
* select exception being printed.
* @param source source for selection
* @param request Select request to issue.
* @param action the action for use in exception creation
* @return response
* @throws IOException failure
*/
@Retries.RetryTranslated
SelectObjectContentResult select(
Path source,
SelectObjectContentRequest request,
String action)
throws IOException;
}

View File

@ -73,4 +73,12 @@ public interface ContextAccessors {
*/
@Retries.RetryTranslated
String getBucketLocation() throws IOException;
/**
* Qualify a path.
*
* @param path path to qualify/normalize
* @return possibly new path.
*/
Path makeQualified(Path path);
}

View File

@ -0,0 +1,420 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BBPartHandle;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.fs.impl.AbstractMultipartUploader;
import org.apache.hadoop.fs.s3a.WriteOperations;
import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatistics;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
/**
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
* upload mechanism.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3AMultipartUploader extends AbstractMultipartUploader {
private final S3AMultipartUploaderBuilder builder;
/** Header for serialized Parts: {@value}. */
public static final String HEADER = "S3A-part01";
private final WriteOperations writeOperations;
private final StoreContext context;
private final S3AMultipartUploaderStatistics statistics;
/**
* Bulk state; demand created and then retained.
*/
private BulkOperationState operationState;
/**
* Was an operation state requested but not returned?
*/
private boolean noOperationState;
/**
* Instatiate; this is called by the builder.
* @param builder builder
* @param writeOperations writeOperations
* @param context s3a context
* @param statistics statistics callbacks
*/
S3AMultipartUploader(
final S3AMultipartUploaderBuilder builder,
final WriteOperations writeOperations,
final StoreContext context,
final S3AMultipartUploaderStatistics statistics) {
super(context.makeQualified(builder.getPath()));
this.builder = builder;
this.writeOperations = writeOperations;
this.context = context;
this.statistics = statistics;
}
@Override
public void close() throws IOException {
if (operationState != null) {
operationState.close();
}
super.close();
}
/**
* Retrieve the operation state; create one on demand if needed
* <i>and there has been no unsuccessful attempt to create one.</i>
* @return an active operation state.
* @throws IOException failure
*/
private synchronized BulkOperationState retrieveOperationState()
throws IOException {
if (operationState == null && !noOperationState) {
operationState = writeOperations.initiateOperation(getBasePath(),
BulkOperationState.OperationType.Upload);
noOperationState = operationState != null;
}
return operationState;
}
@Override
public CompletableFuture<UploadHandle> startUpload(
final Path filePath)
throws IOException {
Path dest = context.makeQualified(filePath);
checkPath(dest);
String key = context.pathToKey(dest);
return context.submit(new CompletableFuture<>(),
() -> {
String uploadId = writeOperations.initiateMultiPartUpload(key);
statistics.uploadStarted();
return BBUploadHandle.from(ByteBuffer.wrap(
uploadId.getBytes(Charsets.UTF_8)));
});
}
@Override
public CompletableFuture<PartHandle> putPart(
final UploadHandle uploadId,
final int partNumber,
final Path filePath,
final InputStream inputStream,
final long lengthInBytes)
throws IOException {
Path dest = context.makeQualified(filePath);
checkPutArguments(dest, inputStream, partNumber, uploadId,
lengthInBytes);
byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
String key = context.pathToKey(dest);
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
return context.submit(new CompletableFuture<>(),
() -> {
UploadPartRequest request = writeOperations.newUploadPartRequest(key,
uploadIdString, partNumber, (int) lengthInBytes, inputStream,
null, 0L);
UploadPartResult result = writeOperations.uploadPart(request);
statistics.partPut(lengthInBytes);
String eTag = result.getETag();
return BBPartHandle.from(
ByteBuffer.wrap(
buildPartHandlePayload(
filePath.toUri().toString(),
uploadIdString,
result.getPartNumber(),
eTag,
lengthInBytes)));
});
}
@Override
public CompletableFuture<PathHandle> complete(
final UploadHandle uploadHandle,
final Path filePath,
final Map<Integer, PartHandle> handleMap)
throws IOException {
Path dest = context.makeQualified(filePath);
checkPath(dest);
byte[] uploadIdBytes = uploadHandle.toByteArray();
checkUploadId(uploadIdBytes);
checkPartHandles(handleMap);
List<Map.Entry<Integer, PartHandle>> handles =
new ArrayList<>(handleMap.entrySet());
handles.sort(Comparator.comparingInt(Map.Entry::getKey));
int count = handles.size();
String key = context.pathToKey(dest);
String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
ArrayList<PartETag> eTags = new ArrayList<>();
eTags.ensureCapacity(handles.size());
long totalLength = 0;
// built up to identify duplicates -if the size of this set is
// below that of the number of parts, then there's a duplicate entry.
Set<Integer> ids = new HashSet<>(count);
for (Map.Entry<Integer, PartHandle> handle : handles) {
PartHandlePayload payload = parsePartHandlePayload(
handle.getValue().toByteArray());
payload.validate(uploadIdStr, filePath);
ids.add(payload.getPartNumber());
totalLength += payload.getLen();
eTags.add(new PartETag(handle.getKey(), payload.getEtag()));
}
Preconditions.checkArgument(ids.size() == count,
"Duplicate PartHandles");
// retrieve/create operation state for scalability of completion.
final BulkOperationState state = retrieveOperationState();
long finalLen = totalLength;
return context.submit(new CompletableFuture<>(),
() -> {
CompleteMultipartUploadResult result =
writeOperations.commitUpload(
key,
uploadIdStr,
eTags,
finalLen,
state);
byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
statistics.uploadCompleted();
return (PathHandle) () -> ByteBuffer.wrap(eTag);
});
}
@Override
public CompletableFuture<Void> abort(
final UploadHandle uploadId,
final Path filePath)
throws IOException {
Path dest = context.makeQualified(filePath);
checkPath(dest);
final byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
return context.submit(new CompletableFuture<>(),
() -> {
writeOperations.abortMultipartCommit(
context.pathToKey(dest),
uploadIdString);
statistics.uploadAborted();
return null;
});
}
/**
* Upload all MPUs under the path.
* @param path path to abort uploads under.
* @return a future which eventually returns the number of entries found
* @throws IOException submission failure
*/
@Override
public CompletableFuture<Integer> abortUploadsUnderPath(final Path path)
throws IOException {
statistics.abortUploadsUnderPathInvoked();
return context.submit(new CompletableFuture<>(),
() ->
writeOperations.abortMultipartUploadsUnderPath(
context.pathToKey(path)));
}
/**
* Build the payload for marshalling.
*
* @param partNumber part number from response
* @param etag upload etag
* @param len length
* @return a byte array to marshall.
* @throws IOException error writing the payload
*/
@VisibleForTesting
static byte[] buildPartHandlePayload(
final String path,
final String uploadId,
final int partNumber,
final String etag,
final long len)
throws IOException {
return new PartHandlePayload(path, uploadId, partNumber, len, etag)
.toBytes();
}
/**
* Parse the payload marshalled as a part handle.
* @param data handle data
* @return the length and etag
* @throws IOException error reading the payload
*/
@VisibleForTesting
static PartHandlePayload parsePartHandlePayload(
final byte[] data)
throws IOException {
try (DataInputStream input =
new DataInputStream(new ByteArrayInputStream(data))) {
final String header = input.readUTF();
if (!HEADER.equals(header)) {
throw new IOException("Wrong header string: \"" + header + "\"");
}
final String path = input.readUTF();
final String uploadId = input.readUTF();
final int partNumber = input.readInt();
final long len = input.readLong();
final String etag = input.readUTF();
if (len < 0) {
throw new IOException("Negative length");
}
return new PartHandlePayload(path, uploadId, partNumber, len, etag);
}
}
/**
* Payload of a part handle; serializes
* the fields using DataInputStream and DataOutputStream.
*/
@VisibleForTesting
static final class PartHandlePayload {
private final String path;
private final String uploadId;
private final int partNumber;
private final long len;
private final String etag;
private PartHandlePayload(
final String path,
final String uploadId,
final int partNumber,
final long len,
final String etag) {
Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
"Empty etag");
Preconditions.checkArgument(StringUtils.isNotEmpty(path),
"Empty path");
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"Empty uploadId");
Preconditions.checkArgument(len >= 0,
"Invalid length");
this.path = path;
this.uploadId = uploadId;
this.partNumber = partNumber;
this.len = len;
this.etag = etag;
}
public String getPath() {
return path;
}
public int getPartNumber() {
return partNumber;
}
public long getLen() {
return len;
}
public String getEtag() {
return etag;
}
public String getUploadId() {
return uploadId;
}
public byte[] toBytes()
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
"Empty etag");
Preconditions.checkArgument(len >= 0,
"Invalid length");
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (DataOutputStream output = new DataOutputStream(bytes)) {
output.writeUTF(HEADER);
output.writeUTF(path);
output.writeUTF(uploadId);
output.writeInt(partNumber);
output.writeLong(len);
output.writeUTF(etag);
}
return bytes.toByteArray();
}
public void validate(String uploadIdStr, Path filePath)
throws PathIOException {
String destUri = filePath.toUri().toString();
if (!destUri.equals(path)) {
throw new PathIOException(destUri,
"Multipart part path mismatch: " + path);
}
if (!uploadIdStr.equals(uploadId)) {
throw new PathIOException(destUri,
"Multipart part ID mismatch: " + uploadId);
}
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import javax.annotation.Nonnull;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.MultipartUploaderBuilderImpl;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperations;
import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatistics;
/**
* Builder for S3A multipart uploaders.
*/
public class S3AMultipartUploaderBuilder extends
MultipartUploaderBuilderImpl<S3AMultipartUploader, S3AMultipartUploaderBuilder> {
private final WriteOperations writeOperations;
private final StoreContext context;
private final S3AMultipartUploaderStatistics statistics;
public S3AMultipartUploaderBuilder(
@Nonnull final S3AFileSystem fileSystem,
@Nonnull final WriteOperations writeOperations,
@Nonnull final StoreContext context,
@Nonnull final Path p,
@Nonnull final S3AMultipartUploaderStatistics statistics) {
super(fileSystem, p);
this.writeOperations = writeOperations;
this.context = context;
this.statistics = statistics;
}
@Override
public S3AMultipartUploaderBuilder getThisBuilder() {
return this;
}
@Override
public S3AMultipartUploader build()
throws IllegalArgumentException, IOException {
return new S3AMultipartUploader(this, writeOperations, context, statistics);
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.impl;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -37,6 +39,7 @@ import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
/**
@ -49,9 +52,10 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
* their own.
*
* <i>Warning:</i> this really is private and unstable. Do not use
* outside the org.apache.hadoop.fs.s3a package.
* outside the org.apache.hadoop.fs.s3a package, or in extension points
* such as DelegationTokens.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate("S3A Filesystem and extensions")
@InterfaceStability.Unstable
public class StoreContext {
@ -114,8 +118,7 @@ public class StoreContext {
/**
* Instantiate.
* No attempt to use a builder here as outside tests
* this should only be created in the S3AFileSystem.
* @deprecated as public method: use {@link StoreContextBuilder}.
*/
public StoreContext(
final URI fsURI,
@ -226,6 +229,16 @@ public class StoreContext {
return contextAccessors.pathToKey(path);
}
/**
* Qualify a path.
*
* @param path path to qualify/normalize
* @return possibly new path.
*/
public Path makeQualified(Path path) {
return contextAccessors.makeQualified(path);
}
/**
* Get the storage statistics of this filesystem.
* @return the storage statistics
@ -351,4 +364,20 @@ public class StoreContext {
? k + "/"
: k;
}
/**
* Submit a closure for execution in the executor
* returned by {@link #getExecutor()}.
* @param <T> type of future
* @param future future for the result.
* @param call callable to invoke.
* @return the future passed in
*/
public <T> CompletableFuture<T> submit(
final CompletableFuture<T> future,
final Callable<T> call) {
getExecutor().submit(() ->
LambdaUtils.eval(future, call));
return future;
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.net.URI;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Builder for the store context.
*/
public class StoreContextBuilder {
private URI fsURI;
private String bucket;
private Configuration configuration;
private String username;
private UserGroupInformation owner;
private ListeningExecutorService executor;
private int executorCapacity;
private Invoker invoker;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private S3AInputPolicy inputPolicy = S3AInputPolicy.Normal;
private ChangeDetectionPolicy changeDetectionPolicy;
private boolean multiObjectDeleteEnabled = true;
private MetadataStore metadataStore;
private boolean useListV1 = false;
private ContextAccessors contextAccessors;
private ITtlTimeProvider timeProvider;
public StoreContextBuilder setFsURI(final URI fsURI) {
this.fsURI = fsURI;
return this;
}
public StoreContextBuilder setBucket(final String b) {
this.bucket = b;
return this;
}
public StoreContextBuilder setConfiguration(final Configuration conf) {
this.configuration = conf;
return this;
}
public StoreContextBuilder setUsername(final String user) {
this.username = user;
return this;
}
public StoreContextBuilder setOwner(final UserGroupInformation ugi) {
this.owner = ugi;
return this;
}
public StoreContextBuilder setExecutor(
final ListeningExecutorService ex) {
this.executor = ex;
return this;
}
public StoreContextBuilder setExecutorCapacity(
final int capacity) {
this.executorCapacity = capacity;
return this;
}
public StoreContextBuilder setInvoker(final Invoker invoke) {
this.invoker = invoke;
return this;
}
public StoreContextBuilder setInstrumentation(
final S3AInstrumentation instr) {
this.instrumentation = instr;
return this;
}
public StoreContextBuilder setStorageStatistics(
final S3AStorageStatistics sstats) {
this.storageStatistics = sstats;
return this;
}
public StoreContextBuilder setInputPolicy(
final S3AInputPolicy policy) {
this.inputPolicy = policy;
return this;
}
public StoreContextBuilder setChangeDetectionPolicy(
final ChangeDetectionPolicy policy) {
this.changeDetectionPolicy = policy;
return this;
}
public StoreContextBuilder setMultiObjectDeleteEnabled(
final boolean enabled) {
this.multiObjectDeleteEnabled = enabled;
return this;
}
public StoreContextBuilder setMetadataStore(
final MetadataStore store) {
this.metadataStore = store;
return this;
}
public StoreContextBuilder setUseListV1(
final boolean useV1) {
this.useListV1 = useV1;
return this;
}
public StoreContextBuilder setContextAccessors(
final ContextAccessors accessors) {
this.contextAccessors = accessors;
return this;
}
public StoreContextBuilder setTimeProvider(
final ITtlTimeProvider provider) {
this.timeProvider = provider;
return this;
}
@SuppressWarnings("deprecation")
public StoreContext build() {
return new StoreContext(fsURI,
bucket,
configuration,
username,
owner,
executor,
executorCapacity,
invoker,
instrumentation,
storageStatistics,
inputPolicy,
changeDetectionPolicy,
multiObjectDeleteEnabled,
metadataStore,
useListV1,
contextAccessors,
timeProvider);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl.statistics;
import java.io.Closeable;
/**
* Statistics for the S3A multipart uploader.
*/
public interface S3AMultipartUploaderStatistics extends Closeable {
void instantiated();
void uploadStarted();
void partPut(long lengthInBytes);
void uploadCompleted();
void uploadAborted();
void abortUploadsUnderPathInvoked();
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl.statistics;
import java.io.IOException;
import java.util.function.BiConsumer;
import org.apache.hadoop.fs.s3a.Statistic;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_INSTANTIATED;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_PART_PUT;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_PART_PUT_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_STARTED;
/**
* Implementation of the uploader statistics.
* This takes a function to update some counter and will update
* this value when things change, so it can be bonded to arbitrary
* statistic collectors.
*/
public final class S3AMultipartUploaderStatisticsImpl implements
S3AMultipartUploaderStatistics {
/**
* The operation to increment a counter/statistic by a value.
*/
private final BiConsumer<Statistic, Long> incrementCallback;
/**
* Constructor.
* @param incrementCallback The operation to increment a
* counter/statistic by a value.
*/
public S3AMultipartUploaderStatisticsImpl(
final BiConsumer<Statistic, Long> incrementCallback) {
this.incrementCallback = incrementCallback;
}
private void inc(Statistic op, long count) {
incrementCallback.accept(op, count);
}
@Override
public void instantiated() {
inc(MULTIPART_INSTANTIATED, 1);
}
@Override
public void uploadStarted() {
inc(MULTIPART_UPLOAD_STARTED, 1);
}
@Override
public void partPut(final long lengthInBytes) {
inc(MULTIPART_PART_PUT, 1);
inc(MULTIPART_PART_PUT_BYTES, lengthInBytes);
}
@Override
public void uploadCompleted() {
inc(MULTIPART_UPLOAD_COMPLETED, 1);
}
@Override
public void uploadAborted() {
inc(MULTIPART_UPLOAD_ABORTED, 1);
}
@Override
public void abortUploadsUnderPathInvoked() {
inc(MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED, 1);
}
@Override
public void close() throws IOException {
}
}

View File

@ -102,5 +102,9 @@ public class BulkOperationState implements Closeable {
* Mkdir operation.
*/
Mkdir,
/**
* Multipart upload operation.
*/
Upload
}
}

View File

@ -912,17 +912,27 @@ public class DynamoDBMetadataStore implements MetadataStore,
DDBPathMetadata oldEntry = ancestorState.put(path, entry);
boolean addAncestors = true;
if (oldEntry != null) {
if (!oldEntry.getFileStatus().isDirectory()
|| !entry.getFileStatus().isDirectory()) {
// check for and warn if the existing bulk operation overwrote it.
// this should never occur outside tests explicitly creating it
// check for and warn if the existing bulk operation has an inconsistent
// entry.
// two directories or two files are both allowed.
// file-over-file can happen in multipart uploaders when the same
// uploader is overwriting file entries to the same destination as
// part of its bulk operation.
boolean oldWasDir = oldEntry.getFileStatus().isDirectory();
boolean newIsDir = entry.getFileStatus().isDirectory();
if ((oldWasDir && !newIsDir)
|| (!oldWasDir && newIsDir)) {
LOG.warn("Overwriting a S3Guard file created in the operation: {}",
oldEntry);
LOG.warn("With new entry: {}", entry);
// restore the old state
ancestorState.put(path, oldEntry);
// then raise an exception
throw new PathIOException(path.toString(), E_INCONSISTENT_UPDATE);
throw new PathIOException(path.toString(),
String.format("%s old %s new %s",
E_INCONSISTENT_UPDATE,
oldEntry,
entry));
} else {
// a directory is already present. Log and continue.
LOG.debug("Directory at {} being updated with value {}",

View File

@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3a.S3AMultipartUploader
org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory

View File

@ -15,25 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_SCALE_TESTS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_HUGE_PARTITION_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_SCALE_TESTS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
/**
* Test MultipartUploader with S3A.
* <p></p>
* Although not an S3A Scale test subclass, it uses the -Dscale option
* to enable it, and partition size option to control the size of
* parts uploaded.
@ -41,14 +44,11 @@ import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_H
public class ITestS3AContractMultipartUploader extends
AbstractContractMultipartUploaderTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class);
private int partitionSize;
/**
* S3 requires a minimum part size of 5MB (except the last part).
* @return 5MB
* @return 5MB+ value
*/
@Override
protected int partSizeInBytes() {
@ -126,37 +126,15 @@ public class ITestS3AContractMultipartUploader extends
DEFAULT_HUGE_PARTITION_SIZE);
}
/**
* Extend superclass teardown with actions to help clean up the S3 store,
* including aborting uploads under the test path.
*/
@Override
public void teardown() throws Exception {
Path teardown = path("teardown").getParent();
S3AFileSystem fs = getFileSystem();
if (fs != null) {
WriteOperationHelper helper = fs.getWriteOperationHelper();
try {
LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
int count = helper.abortMultipartUploadsUnderPath(
fs.pathToKey(teardown));
LOG.info("Found {} incomplete uploads", count);
} catch (Exception e) {
LOG.warn("Exeception in teardown", e);
}
}
super.teardown();
}
/**
* S3 has no concept of directories, so this test does not apply.
*/
public void testDirectoryInTheWay() throws Exception {
// no-op
skip("unsupported");
}
@Override
public void testMultipartUploadReverseOrder() throws Exception {
ContractTestUtils.skip("skipped for speed");
skip("skipped for speed");
}
}

View File

@ -203,29 +203,31 @@ public class TestPartialDeleteFailures {
OperationTrackingStore store) throws URISyntaxException, IOException {
URI name = new URI("s3a://bucket");
Configuration conf = new Configuration();
return new StoreContext(
name,
"bucket",
conf,
"alice",
UserGroupInformation.getCurrentUser(),
BlockingThreadPoolExecutorService.newInstance(
return new StoreContextBuilder().setFsURI(name)
.setBucket("bucket")
.setConfiguration(conf)
.setUsername("alice")
.setOwner(UserGroupInformation.getCurrentUser())
.setExecutor(BlockingThreadPoolExecutorService.newInstance(
4,
4,
10, TimeUnit.SECONDS,
"s3a-transfer-shared"),
Constants.DEFAULT_EXECUTOR_CAPACITY,
new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT),
new S3AInstrumentation(name),
new S3AStorageStatistics(),
S3AInputPolicy.Normal,
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false),
multiDelete,
store,
false,
CONTEXT_ACCESSORS,
new S3Guard.TtlTimeProvider(conf));
"s3a-transfer-shared"))
.setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
.setInvoker(
new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
.setInstrumentation(new S3AInstrumentation(name))
.setStorageStatistics(new S3AStorageStatistics())
.setInputPolicy(S3AInputPolicy.Normal)
.setChangeDetectionPolicy(
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false))
.setMultiObjectDeleteEnabled(multiDelete)
.setMetadataStore(store)
.setUseListV1(false)
.setContextAccessors(CONTEXT_ACCESSORS)
.setTimeProvider(new S3Guard.TtlTimeProvider(conf))
.build();
}
private static class MinimalContextAccessor implements ContextAccessors {
@ -251,6 +253,10 @@ public class TestPartialDeleteFailures {
return null;
}
@Override
public Path makeQualified(final Path path) {
return path;
}
}
/**
* MetadataStore which tracks what is deleted and added.

View File

@ -16,51 +16,60 @@
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.fs.s3a.impl;
import java.io.EOFException;
import java.io.IOException;
import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*;
import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload;
import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.PartHandlePayload;
import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.buildPartHandlePayload;
import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.parsePartHandlePayload;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test multipart upload support methods and classes.
* Unit test of multipart upload support methods and classes.
*/
public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
public static final String PATH = "s3a://bucket/path";
public static final String UPLOAD = "01";
@Test
public void testRoundTrip() throws Throwable {
Pair<Long, String> result = roundTrip("tag", 1);
assertEquals("tag", result.getRight());
assertEquals(1, result.getLeft().longValue());
PartHandlePayload result = roundTrip(999, "tag", 1);
assertEquals(PATH, result.getPath());
assertEquals(UPLOAD, result.getUploadId());
assertEquals(999, result.getPartNumber());
assertEquals("tag", result.getEtag());
assertEquals(1, result.getLen());
}
@Test
public void testRoundTrip2() throws Throwable {
long len = 1L + Integer.MAX_VALUE;
Pair<Long, String> result = roundTrip("11223344",
len);
assertEquals("11223344", result.getRight());
assertEquals(len, result.getLeft().longValue());
PartHandlePayload result =
roundTrip(1, "11223344", len);
assertEquals(1, result.getPartNumber());
assertEquals("11223344", result.getEtag());
assertEquals(len, result.getLen());
}
@Test
public void testNoEtag() throws Throwable {
intercept(IllegalArgumentException.class,
() -> buildPartHandlePayload("", 1));
() -> buildPartHandlePayload(PATH, UPLOAD,
0, "", 1));
}
@Test
public void testNoLen() throws Throwable {
intercept(IllegalArgumentException.class,
() -> buildPartHandlePayload("tag", -1));
() -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1));
}
@Test
@ -71,14 +80,17 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
@Test
public void testBadHeader() throws Throwable {
byte[] bytes = buildPartHandlePayload("tag", 1);
bytes[2]='f';
byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1);
bytes[2] = 'f';
intercept(IOException.class, "header",
() -> parsePartHandlePayload(bytes));
}
private Pair<Long, String> roundTrip(final String tag, final long len) throws IOException {
byte[] bytes = buildPartHandlePayload(tag, len);
private PartHandlePayload roundTrip(
int partNumber,
String tag,
long len) throws IOException {
byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len);
return parsePartHandlePayload(bytes);
}
}