diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java index 94c78610169..b77c244220a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java @@ -19,21 +19,23 @@ package org.apache.hadoop.fs; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.fs.Path.mergePaths; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** * A MultipartUploader that uses the basic FileSystem commands. @@ -70,7 +72,8 @@ public class FileSystemMultipartUploader extends MultipartUploader { public PartHandle putPart(Path filePath, InputStream inputStream, int partNumber, UploadHandle uploadId, long lengthInBytes) throws IOException { - + checkPutArguments(filePath, inputStream, partNumber, uploadId, + lengthInBytes); byte[] uploadIdByteArray = uploadId.toByteArray(); checkUploadId(uploadIdByteArray); Path collectorPath = new Path(new String(uploadIdByteArray, 0, @@ -82,16 +85,17 @@ public class FileSystemMultipartUploader extends MultipartUploader { fs.createFile(partPath).build()) { IOUtils.copy(inputStream, fsDataOutputStream, 4096); } finally { - org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream); + cleanupWithLogger(LOG, inputStream); } return BBPartHandle.from(ByteBuffer.wrap( partPath.toString().getBytes(Charsets.UTF_8))); } private Path createCollectorPath(Path filePath) { + String uuid = UUID.randomUUID().toString(); return mergePaths(filePath.getParent(), mergePaths(new Path(filePath.getName().split("\\.")[0]), - mergePaths(new Path("_multipart"), + mergePaths(new Path("_multipart_" + uuid), new Path(Path.SEPARATOR)))); } @@ -110,21 +114,16 @@ public class FileSystemMultipartUploader extends MultipartUploader { @Override @SuppressWarnings("deprecation") // rename w/ OVERWRITE - public PathHandle complete(Path filePath, - List> handles, UploadHandle multipartUploadId) - throws IOException { + public PathHandle complete(Path filePath, Map handleMap, + UploadHandle multipartUploadId) throws IOException { checkUploadId(multipartUploadId.toByteArray()); - if (handles.isEmpty()) { - throw new IOException("Empty upload"); - } - // If destination already exists, we believe we already completed it. - if (fs.exists(filePath)) { - return getPathHandle(filePath); - } + checkPartHandles(handleMap); + List> handles = + new ArrayList<>(handleMap.entrySet()); + handles.sort(Comparator.comparingInt(Map.Entry::getKey)); - handles.sort(Comparator.comparing(Pair::getKey)); List partHandles = handles .stream() .map(pair -> { @@ -134,7 +133,10 @@ public class FileSystemMultipartUploader extends MultipartUploader { }) .collect(Collectors.toList()); - Path collectorPath = createCollectorPath(filePath); + byte[] uploadIdByteArray = multipartUploadId.toByteArray(); + Path collectorPath = new Path(new String(uploadIdByteArray, 0, + uploadIdByteArray.length, Charsets.UTF_8)); + boolean emptyFile = totalPartsLen(partHandles) == 0; if (emptyFile) { fs.create(filePath).close(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java index b56dbf880f2..7ed987eed90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java @@ -17,39 +17,44 @@ */ package org.apache.hadoop.fs; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.util.List; +import java.util.Map; -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.tuple.Pair; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import static com.google.common.base.Preconditions.checkArgument; + /** * MultipartUploader is an interface for copying files multipart and across * multiple nodes. Users should: *
    - *
  1. Initialize an upload
  2. - *
  3. Upload parts in any order
  4. + *
  5. Initialize an upload.
  6. + *
  7. Upload parts in any order.
  8. *
  9. Complete the upload in order to have it materialize in the destination - * FS
  10. + * FS. *
- * - * Implementers should make sure that the complete function should make sure - * that 'complete' will reorder parts if the destination FS doesn't already - * do it for them. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public abstract class MultipartUploader { +public abstract class MultipartUploader implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(MultipartUploader.class); + /** + * Perform any cleanup. + * The upload is not required to support any operations after this. + * @throws IOException problems on close. + */ + @Override + public void close() throws IOException { + } + /** * Initialize a multipart upload. * @param filePath Target path for upload. @@ -59,8 +64,8 @@ public abstract class MultipartUploader { public abstract UploadHandle initialize(Path filePath) throws IOException; /** - * Put part as part of a multipart upload. It should be possible to have - * parts uploaded in any order (or in parallel). + * Put part as part of a multipart upload. + * It is possible to have parts uploaded in any order (or in parallel). * @param filePath Target path for upload (same as {@link #initialize(Path)}). * @param inputStream Data for this part. Implementations MUST close this * stream after reading in the data. @@ -77,15 +82,15 @@ public abstract class MultipartUploader { /** * Complete a multipart upload. * @param filePath Target path for upload (same as {@link #initialize(Path)}. - * @param handles non-empty list of identifiers with associated part numbers + * @param handles non-empty map of part number to part handle. * from {@link #putPart(Path, InputStream, int, UploadHandle, long)}. - * Depending on the backend, the list order may be significant. * @param multipartUploadId Identifier from {@link #initialize(Path)}. * @return unique PathHandle identifier for the uploaded file. - * @throws IOException IO failure or the handle list is empty. + * @throws IOException IO failure */ public abstract PathHandle complete(Path filePath, - List> handles, UploadHandle multipartUploadId) + Map handles, + UploadHandle multipartUploadId) throws IOException; /** @@ -98,13 +103,52 @@ public abstract class MultipartUploader { throws IOException; /** - * Utility method to validate uploadIDs - * @param uploadId - * @throws IllegalArgumentException + * Utility method to validate uploadIDs. + * @param uploadId Upload ID + * @throws IllegalArgumentException invalid ID */ protected void checkUploadId(byte[] uploadId) throws IllegalArgumentException { - Preconditions.checkArgument(uploadId.length > 0, + checkArgument(uploadId != null, "null uploadId"); + checkArgument(uploadId.length > 0, "Empty UploadId is not valid"); } + + /** + * Utility method to validate partHandles. + * @param partHandles handles + * @throws IllegalArgumentException if the parts are invalid + */ + protected void checkPartHandles(Map partHandles) { + checkArgument(!partHandles.isEmpty(), + "Empty upload"); + partHandles.keySet() + .stream() + .forEach(key -> + checkArgument(key > 0, + "Invalid part handle index %s", key)); + } + + /** + * Check all the arguments to the + * {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation. + * @param filePath Target path for upload (same as {@link #initialize(Path)}). + * @param inputStream Data for this part. Implementations MUST close this + * stream after reading in the data. + * @param partNumber Index of the part relative to others. + * @param uploadId Identifier from {@link #initialize(Path)}. + * @param lengthInBytes Target length to read from the stream. + * @throws IllegalArgumentException invalid argument + */ + protected void checkPutArguments(Path filePath, + InputStream inputStream, + int partNumber, + UploadHandle uploadId, + long lengthInBytes) throws IllegalArgumentException { + checkArgument(filePath != null, "null filePath"); + checkArgument(inputStream != null, "null inputStream"); + checkArgument(partNumber > 0, "Invalid part number: %d", partNumber); + checkArgument(uploadId != null, "null uploadId"); + checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java index 8b3523272f8..e35b6bf18bb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java @@ -52,6 +52,13 @@ public abstract class MultipartUploaderFactory { } } + /** + * Get the multipart loader for a specific filesystem. + * @param fs filesystem + * @param conf configuration + * @return an uploader, or null if one was found. + * @throws IOException failure during the creation process. + */ public static MultipartUploader get(FileSystem fs, Configuration conf) throws IOException { MultipartUploader mpu = null; diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md index 532b6c7b688..6b4399ea212 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md @@ -36,3 +36,4 @@ HDFS as these are commonly expected by Hadoop client applications. 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html) 2. [Testing with the Filesystem specification](testing.html) 2. [Extending the specification and its tests](extending.html) +1. [Uploading a file using Multiple Parts](multipartuploader.html) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md new file mode 100644 index 00000000000..629c0c418fd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md @@ -0,0 +1,235 @@ + + + + + + + +# class `org.apache.hadoop.fs.MultipartUploader` + + + +The abstract `MultipartUploader` class is the original class to upload a file +using multiple parts to Hadoop-supported filesystems. The benefits of a +multipart upload is that the file can be uploaded from multiple clients or +processes in parallel and the results will not be visible to other clients until +the `complete` function is called. + +When implemented by an object store, uploaded data may incur storage charges, +even before it is visible in the filesystems. Users of this API must be diligent +and always perform best-effort attempts to complete or abort the upload. + +## Invariants + +All the requirements of a valid MultipartUploader are considered implicit +econditions and postconditions: +all operations on a valid MultipartUploader MUST result in a new +MultipartUploader that is also valid. + +The operations of a single multipart upload may take place across different +instance of a multipart uploader, across different processes and hosts. +It is therefore a requirement that: + +1. All state needed to upload a part, complete an upload or abort an upload +must be contained within or retrievable from an upload handle. + +1. If an upload handle is marshalled to another process, then, if the +receiving process has the correct permissions, it may participate in the +upload, by uploading one or more parts, by completing an upload, and/or by +aborting the upload. + +## Concurrency + +Multiple processes may upload parts of a multipart upload simultaneously. + +If a call is made to `initialize(path)` to a destination where an active +upload is in progress, implementations MUST perform one of the two operations. + +* Reject the call as a duplicate. +* Permit both to proceed, with the final output of the file being +that of _exactly one of the two uploads_. + +Which upload succeeds is undefined. Users must not expect consistent +behavior across filesystems, across filesystem instances *or even +across different requests. + +If a multipart upload is completed or aborted while a part upload is in progress, +the in-progress upload, if it has not completed, must not be included in +the final file, in whole or in part. Implementations SHOULD raise an error +in the `putPart()` operation. + +## Model + +A File System which supports Multipart Uploads extends the existing model +`(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)` +`Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`. + + +The `Uploads` element of the state tuple is a map of all active uploads. + +```python +Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]` +``` + +An UploadHandle is a non-empty list of bytes. +```python +UploadHandle: List[byte] +len(UploadHandle) > 0 +``` + +Clients *MUST* treat this as opaque. What is core to this features design is that the handle is valid from +across clients: the handle may be serialized on host `hostA`, deserialized on `hostB` and still used +to extend or complete the upload. + +```python +UploadPart = (Path: path, parts: Map[PartHandle -> byte[]]) +``` + +Similarly, the `PartHandle` type is also a non-empty list of opaque bytes, again, marshallable between hosts. + +```python +PartHandle: List[byte] +``` + +It is implicit that each `UploadHandle` in `FS.Uploads` is unique. +Similarly, each `PartHandle` in the map of `[PartHandle -> UploadPart]` must also be unique. + +1. There is no requirement that Part Handles are unique across uploads. +1. There is no requirement that Upload Handles are unique over time. +However, if Part Handles are rapidly recycled, there is a risk that the nominally +idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a +successor operation which used the same Upload Handle. + +## State Changing Operations + +### `UploadHandle initialize(Path path)` + +Initialized a Multipart Upload, returning an upload handle for use in +subsequent operations. + +#### Preconditions + +```python +if path == "/" : raise IOException + +if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException +``` + +If a filesystem does not support concurrent uploads to a destination, +then the following precondition is added + +```python +if path in values(FS.Uploads) raise PathExistsException, IOException + +``` + + +#### Postconditions + +The outcome of this operation is that the filesystem state is updated with a new +active upload, with a new handle, this handle being returned to the caller. + +```python +handle' = UploadHandle where not handle' in keys(FS.Uploads) +FS' = FS where FS'.Uploads(handle') == {} +result = handle' +``` + +### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)` + +Upload a part for the multipart upload. + +#### Preconditions + + +```python +uploadHandle in keys(FS.Uploads) +partNumber >= 1 +lengthInBytes >= 0 +len(inputStream) >= lengthInBytes +``` + +#### Postconditions + +```python +data' = inputStream(0..lengthInBytes) +partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts) +FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data' +result = partHandle' +``` + +The data is stored in the filesystem, pending completion. + + +### `PathHandle complete(Path path, Map parts, UploadHandle multipartUploadId)` + +Complete the multipart upload. + +A Filesystem may enforce a minimum size of each part, excluding the last part uploaded. + +If a part is out of this range, an `IOException` MUST be raised. + +#### Preconditions + +```python +uploadHandle in keys(FS.Uploads) else raise FileNotFoundException +FS.Uploads(uploadHandle).path == path +if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException +parts.size() > 0 +``` + +If there are handles in the MPU which aren't included in the map, then the omitted +parts will not be a part of the resulting file. It is up to the implementation +of the MultipartUploader to make sure the leftover parts are cleaned up. + +In the case of backing stores that support directories (local filesystem, HDFS, +etc), if, at the point of completion, there is now a directory at the +destination then a `PathIsDirectoryException` or other `IOException` must be thrown. + +#### Postconditions + +```python +UploadData' == ordered concatention of all data in the map of parts, ordered by key +exists(FS', path') and result = PathHandle(path') +FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads) +``` + +The PathHandle is returned by the complete operation so subsequent operations +will be able to identify that the data has not changed in the meantime. + +The order of parts in the uploaded by file is that of the natural order of +parts: part 1 is ahead of part 2, etc. + + +### `void abort(Path path, UploadHandle multipartUploadId)` + +Abort a multipart upload. The handle becomes invalid and not subject to reuse. + +#### Preconditions + + +```python +uploadHandle in keys(FS.Uploads) else raise FileNotFoundException +``` + +#### Postconditions + +The upload handle is no longer known. + +```python +FS' = FS where not uploadHandle in keys(FS'.uploads) +``` +A subsequent call to `abort()` with the same handle will fail, unless +the handle has been recycled. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java index 7cee5a60815..7a8f0830eda 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -23,15 +23,19 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; import com.google.common.base.Charsets; +import org.junit.Assume; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BBUploadHandle; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -43,11 +47,64 @@ import org.apache.hadoop.fs.PathHandle; import org.apache.hadoop.fs.UploadHandle; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; import static org.apache.hadoop.test.LambdaTestUtils.intercept; public abstract class AbstractContractMultipartUploaderTest extends AbstractFSContractTestBase { + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class); + + /** + * Size of very small uploads. + * Enough to be non empty, not big enough to cause delays on uploads. + */ + protected static final int SMALL_FILE = 100; + + private MultipartUploader mpu; + private MultipartUploader mpu2; + private final Random random = new Random(); + private UploadHandle activeUpload; + private Path activeUploadPath; + + protected String getMethodName() { + return methodName.getMethodName(); + } + + @Override + public void setup() throws Exception { + super.setup(); + Configuration conf = getContract().getConf(); + mpu = MultipartUploaderFactory.get(getFileSystem(), conf); + mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf); + } + + @Override + public void teardown() throws Exception { + if (mpu!= null && activeUpload != null) { + try { + mpu.abort(activeUploadPath, activeUpload); + } catch (FileNotFoundException ignored) { + /* this is fine */ + } catch (Exception e) { + LOG.info("in teardown", e); + } + } + cleanupWithLogger(LOG, mpu, mpu2); + super.teardown(); + } + + /** + * Get a test path based on the method name. + * @return a path to use in the test + * @throws IOException failure to build the path name up. + */ + protected Path methodPath() throws IOException { + return path(getMethodName()); + } + /** * The payload is the part number repeated for the length of the part. * This makes checking the correctness of the upload straightforward. @@ -55,9 +112,19 @@ public abstract class AbstractContractMultipartUploaderTest extends * @return the bytes to upload. */ private byte[] generatePayload(int partNumber) { - int sizeInBytes = partSizeInBytes(); - ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes); - for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) { + return generatePayload(partNumber, partSizeInBytes()); + } + + /** + * Generate a payload of a given size; part number is used + * for all the values. + * @param partNumber part number + * @param size size in bytes + * @return the bytes to upload. + */ + private byte[] generatePayload(final int partNumber, final int size) { + ByteBuffer buffer = ByteBuffer.allocate(size); + for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) { buffer.putInt(partNumber); } return buffer.array(); @@ -70,11 +137,14 @@ public abstract class AbstractContractMultipartUploaderTest extends * @throws IOException failure to read or digest the file. */ protected byte[] digest(Path path) throws IOException { - FileSystem fs = getFileSystem(); - try (InputStream in = fs.open(path)) { + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + try (InputStream in = getFileSystem().open(path)) { byte[] fdData = IOUtils.toByteArray(in); MessageDigest newDigest = DigestUtils.getMd5Digest(); - return newDigest.digest(fdData); + byte[] digest = newDigest.digest(fdData); + return digest; + } finally { + timer.end("Download and digest of path %s", path); } } @@ -92,75 +162,231 @@ public abstract class AbstractContractMultipartUploaderTest extends return 10; } + /** + * How long in milliseconds for propagation of + * store changes, including update/delete/list + * to be everywhere. + * If 0: the FS is consistent. + * @return a time in milliseconds. + */ + protected int timeToBecomeConsistentMillis() { + return 0; + } + + /** + * Does a call to finalize an upload (either complete or abort) consume the + * uploadID immediately or is it reaped at a later point in time? + * @return true if the uploadID will be consumed immediately (and no longer + * resuable). + */ + protected abstract boolean finalizeConsumesUploadIdImmediately(); + + /** + * Does the store support concurrent uploads to the same destination path? + * @return true if concurrent uploads are supported. + */ + protected abstract boolean supportsConcurrentUploadsToSamePath(); + + /** + * Pick a multipart uploader from the index value. + * @param index index of upload + * @return an uploader + */ + protected MultipartUploader mpu(int index) { + return (index % 2 == 0) ? mpu : mpu2; + } + + /** + * Pick a multipart uploader at random. + * @return an uploader + */ + protected MultipartUploader randomMpu() { + return mpu(random.nextInt(10)); + } + /** * Assert that a multipart upload is successful. * @throws Exception failure */ @Test public void testSingleUpload() throws Exception { - FileSystem fs = getFileSystem(); - Path file = path("testSingleUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + Map partHandles = new HashMap<>(); MessageDigest origDigest = DigestUtils.getMd5Digest(); - byte[] payload = generatePayload(1); + int size = SMALL_FILE; + byte[] payload = generatePayload(1, size); origDigest.update(payload); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle, - payload.length); - partHandles.add(Pair.of(1, partHandle)); - PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles, + PartHandle partHandle = putPart(file, uploadHandle, 1, payload); + partHandles.put(1, partHandle); + PathHandle fd = completeUpload(file, uploadHandle, partHandles, origDigest, - payload.length); + size); - // Complete is idempotent - PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle); - assertArrayEquals("Path handles differ", fd.toByteArray(), - fd2.toByteArray()); + if (finalizeConsumesUploadIdImmediately()) { + intercept(FileNotFoundException.class, + () -> mpu.complete(file, partHandles, uploadHandle)); + } else { + PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle); + assertArrayEquals("Path handles differ", fd.toByteArray(), + fd2.toByteArray()); + } } - private PathHandle completeUpload(final Path file, - final MultipartUploader mpu, + /** + * Initialize an upload. + * This saves the path and upload handle as the active + * upload, for aborting in teardown + * @param dest destination + * @return the handle + * @throws IOException failure to initialize + */ + protected UploadHandle initializeUpload(final Path dest) throws IOException { + activeUploadPath = dest; + activeUpload = randomMpu().initialize(dest); + return activeUpload; + } + + /** + * Generate then upload a part. + * @param file destination + * @param uploadHandle handle + * @param index index of part + * @param origDigest digest to build up. May be null + * @return the part handle + * @throws IOException IO failure. + */ + protected PartHandle buildAndPutPart( + final Path file, final UploadHandle uploadHandle, - final List> partHandles, + final int index, + final MessageDigest origDigest) throws IOException { + byte[] payload = generatePayload(index); + if (origDigest != null) { + origDigest.update(payload); + } + return putPart(file, uploadHandle, index, payload); + } + + /** + * Put a part. + * The entire byte array is uploaded. + * @param file destination + * @param uploadHandle handle + * @param index index of part + * @param payload byte array of payload + * @return the part handle + * @throws IOException IO failure. + */ + protected PartHandle putPart(final Path file, + final UploadHandle uploadHandle, + final int index, + final byte[] payload) throws IOException { + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + PartHandle partHandle = mpu(index) + .putPart(file, + new ByteArrayInputStream(payload), + index, + uploadHandle, + payload.length); + timer.end("Uploaded part %s", index); + LOG.info("Upload bandwidth {} MB/s", + timer.bandwidthDescription(payload.length)); + return partHandle; + } + + /** + * Complete an upload with the active MPU instance. + * @param file destination + * @param uploadHandle handle + * @param partHandles map of handles + * @param origDigest digest of source data (may be null) + * @param expectedLength expected length of result. + * @return the path handle from the upload. + * @throws IOException IO failure + */ + private PathHandle completeUpload(final Path file, + final UploadHandle uploadHandle, + final Map partHandles, final MessageDigest origDigest, final int expectedLength) throws IOException { - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + PathHandle fd = complete(file, uploadHandle, partHandles); FileStatus status = verifyPathExists(getFileSystem(), "Completed file", file); assertEquals("length of " + status, expectedLength, status.getLen()); + if (origDigest != null) { + verifyContents(file, origDigest, expectedLength); + } + return fd; + } + + /** + * Verify the contents of a file. + * @param file path + * @param origDigest digest + * @param expectedLength expected length (for logging B/W) + * @throws IOException IO failure + */ + protected void verifyContents(final Path file, + final MessageDigest origDigest, + final int expectedLength) throws IOException { + ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); assertArrayEquals("digest of source and " + file + " differ", origDigest.digest(), digest(file)); + timer2.end("Completed digest", file); + LOG.info("Download bandwidth {} MB/s", + timer2.bandwidthDescription(expectedLength)); + } + + /** + * Perform the inner complete without verification. + * @param file destination path + * @param uploadHandle upload handle + * @param partHandles map of parts + * @return the path handle from the upload. + * @throws IOException IO failure + */ + private PathHandle complete(final Path file, + final UploadHandle uploadHandle, + final Map partHandles) throws IOException { + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle); + timer.end("Completed upload to %s", file); return fd; } + /** + * Abort an upload. + * @param file path + * @param uploadHandle handle + * @throws IOException failure + */ + private void abortUpload(final Path file, UploadHandle uploadHandle) + throws IOException { + randomMpu().abort(file, uploadHandle); + } + /** * Assert that a multipart upload is successful. * @throws Exception failure */ @Test public void testMultipartUpload() throws Exception { - FileSystem fs = getFileSystem(); - Path file = path("testMultipartUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + Map partHandles = new HashMap<>(); MessageDigest origDigest = DigestUtils.getMd5Digest(); final int payloadCount = getTestPayloadCount(); for (int i = 1; i <= payloadCount; ++i) { - byte[] payload = generatePayload(i); - origDigest.update(payload); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, - payload.length); - partHandles.add(Pair.of(i, partHandle)); + PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, + origDigest); + partHandles.put(i, partHandle); } - completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + completeUpload(file, uploadHandle, partHandles, origDigest, payloadCount * partSizeInBytes()); } @@ -173,17 +399,33 @@ public abstract class AbstractContractMultipartUploaderTest extends public void testMultipartUploadEmptyPart() throws Exception { FileSystem fs = getFileSystem(); Path file = path("testMultipartUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); - MessageDigest origDigest = DigestUtils.getMd5Digest(); - byte[] payload = new byte[0]; - origDigest.update(payload); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle, - payload.length); - partHandles.add(Pair.of(0, partHandle)); - completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0); + try (MultipartUploader uploader = + MultipartUploaderFactory.get(fs, null)) { + UploadHandle uploadHandle = uploader.initialize(file); + + Map partHandles = new HashMap<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + byte[] payload = new byte[0]; + origDigest.update(payload); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle, + payload.length); + partHandles.put(1, partHandle); + completeUpload(file, uploadHandle, partHandles, origDigest, 0); + } + } + + /** + * Assert that a multipart upload is successful. + * @throws Exception failure + */ + @Test + public void testUploadEmptyBlock() throws Exception { + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + Map partHandles = new HashMap<>(); + partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0])); + completeUpload(file, uploadHandle, partHandles, null, 0); } /** @@ -192,11 +434,9 @@ public abstract class AbstractContractMultipartUploaderTest extends */ @Test public void testMultipartUploadReverseOrder() throws Exception { - FileSystem fs = getFileSystem(); - Path file = path("testMultipartUploadReverseOrder"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + Map partHandles = new HashMap<>(); MessageDigest origDigest = DigestUtils.getMd5Digest(); final int payloadCount = getTestPayloadCount(); for (int i = 1; i <= payloadCount; ++i) { @@ -204,13 +444,9 @@ public abstract class AbstractContractMultipartUploaderTest extends origDigest.update(payload); } for (int i = payloadCount; i > 0; --i) { - byte[] payload = generatePayload(i); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, - payload.length); - partHandles.add(Pair.of(i, partHandle)); + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); } - completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + completeUpload(file, uploadHandle, partHandles, origDigest, payloadCount * partSizeInBytes()); } @@ -222,25 +458,19 @@ public abstract class AbstractContractMultipartUploaderTest extends public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exception { describe("Upload in reverse order and the part numbers are not contiguous"); - FileSystem fs = getFileSystem(); - Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); MessageDigest origDigest = DigestUtils.getMd5Digest(); int payloadCount = 2 * getTestPayloadCount(); for (int i = 2; i <= payloadCount; i += 2) { byte[] payload = generatePayload(i); origDigest.update(payload); } + Map partHandles = new HashMap<>(); for (int i = payloadCount; i > 0; i -= 2) { - byte[] payload = generatePayload(i); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, - payload.length); - partHandles.add(Pair.of(i, partHandle)); + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); } - completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + completeUpload(file, uploadHandle, partHandles, origDigest, getTestPayloadCount() * partSizeInBytes()); } @@ -251,19 +481,14 @@ public abstract class AbstractContractMultipartUploaderTest extends @Test public void testMultipartUploadAbort() throws Exception { describe("Upload and then abort it before completing"); - FileSystem fs = getFileSystem(); - Path file = path("testMultipartUploadAbort"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List> partHandles = new ArrayList<>(); - for (int i = 20; i >= 10; --i) { - byte[] payload = generatePayload(i); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, - payload.length); - partHandles.add(Pair.of(i, partHandle)); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + int end = 10; + Map partHandles = new HashMap<>(); + for (int i = 12; i > 10; i--) { + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); } - mpu.abort(file, uploadHandle); + abortUpload(file, uploadHandle); String contents = "ThisIsPart49\n"; int len = contents.getBytes(Charsets.UTF_8).length; @@ -275,6 +500,15 @@ public abstract class AbstractContractMultipartUploaderTest extends () -> mpu.complete(file, partHandles, uploadHandle)); assertPathDoesNotExist("Uploaded file should not exist", file); + + // A second abort should be an FileNotFoundException if the UploadHandle is + // consumed by finalization operations (complete, abort). + if (finalizeConsumesUploadIdImmediately()) { + intercept(FileNotFoundException.class, + () -> abortUpload(file, uploadHandle)); + } else { + abortUpload(file, uploadHandle); + } } /** @@ -282,13 +516,23 @@ public abstract class AbstractContractMultipartUploaderTest extends */ @Test public void testAbortUnknownUpload() throws Exception { - FileSystem fs = getFileSystem(); - Path file = path("testAbortUnknownUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + Path file = methodPath(); ByteBuffer byteBuffer = ByteBuffer.wrap( "invalid-handle".getBytes(Charsets.UTF_8)); UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); - intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle)); + intercept(FileNotFoundException.class, + () -> abortUpload(file, uploadHandle)); + } + + /** + * Trying to abort with a handle of size 0 must fail. + */ + @Test + public void testAbortEmptyUpload() throws Exception { + describe("initialize upload and abort before uploading data"); + Path file = methodPath(); + abortUpload(file, initializeUpload(file)); + assertPathDoesNotExist("Uploaded file should not exist", file); } /** @@ -296,13 +540,10 @@ public abstract class AbstractContractMultipartUploaderTest extends */ @Test public void testAbortEmptyUploadHandle() throws Exception { - FileSystem fs = getFileSystem(); - Path file = path("testAbortEmptyUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]); UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); intercept(IllegalArgumentException.class, - () -> mpu.abort(file, uploadHandle)); + () -> abortUpload(methodPath(), uploadHandle)); } /** @@ -311,26 +552,20 @@ public abstract class AbstractContractMultipartUploaderTest extends @Test public void testCompleteEmptyUpload() throws Exception { describe("Expect an empty MPU to fail, but still be abortable"); - FileSystem fs = getFileSystem(); - Path dest = path("testCompleteEmptyUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle handle = mpu.initialize(dest); - intercept(IOException.class, - () -> mpu.complete(dest, new ArrayList<>(), handle)); - mpu.abort(dest, handle); + Path dest = methodPath(); + UploadHandle handle = initializeUpload(dest); + intercept(IllegalArgumentException.class, + () -> mpu.complete(dest, new HashMap<>(), handle)); + abortUpload(dest, handle); } /** * When we pass empty uploadID, putPart throws IllegalArgumentException. - * @throws Exception */ @Test public void testPutPartEmptyUploadID() throws Exception { describe("Expect IllegalArgumentException when putPart uploadID is empty"); - FileSystem fs = getFileSystem(); - Path dest = path("testCompleteEmptyUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - mpu.initialize(dest); + Path dest = methodPath(); UploadHandle emptyHandle = BBUploadHandle.from(ByteBuffer.wrap(new byte[0])); byte[] payload = generatePayload(1); @@ -341,25 +576,123 @@ public abstract class AbstractContractMultipartUploaderTest extends /** * When we pass empty uploadID, complete throws IllegalArgumentException. - * @throws Exception */ @Test public void testCompleteEmptyUploadID() throws Exception { describe("Expect IllegalArgumentException when complete uploadID is empty"); - FileSystem fs = getFileSystem(); - Path dest = path("testCompleteEmptyUpload"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle realHandle = mpu.initialize(dest); + Path dest = methodPath(); + UploadHandle realHandle = initializeUpload(dest); UploadHandle emptyHandle = BBUploadHandle.from(ByteBuffer.wrap(new byte[0])); - List> partHandles = new ArrayList<>(); - byte[] payload = generatePayload(1); - InputStream is = new ByteArrayInputStream(payload); - PartHandle partHandle = mpu.putPart(dest, is, 1, realHandle, - payload.length); - partHandles.add(Pair.of(1, partHandle)); + Map partHandles = new HashMap<>(); + PartHandle partHandle = putPart(dest, realHandle, 1, + generatePayload(1, SMALL_FILE)); + partHandles.put(1, partHandle); intercept(IllegalArgumentException.class, () -> mpu.complete(dest, partHandles, emptyHandle)); + + // and, while things are setup, attempt to complete with + // a part index of 0 + partHandles.clear(); + partHandles.put(0, partHandle); + intercept(IllegalArgumentException.class, + () -> mpu.complete(dest, partHandles, realHandle)); + } + + /** + * Assert that upon completion, a directory in the way of the file will + * result in a failure. This test only applies to backing stores with a + * concept of directories. + * @throws Exception failure + */ + @Test + public void testDirectoryInTheWay() throws Exception { + FileSystem fs = getFileSystem(); + Path file = methodPath(); + UploadHandle uploadHandle = initializeUpload(file); + Map partHandles = new HashMap<>(); + int size = SMALL_FILE; + PartHandle partHandle = putPart(file, uploadHandle, 1, + generatePayload(1, size)); + partHandles.put(1, partHandle); + + fs.mkdirs(file); + intercept(IOException.class, + () -> completeUpload(file, uploadHandle, partHandles, null, + size)); + // abort should still work + abortUpload(file, uploadHandle); + } + + @Test + public void testConcurrentUploads() throws Throwable { + + // if the FS doesn't support concurrent uploads, this test is + // required to fail during the second initialization. + final boolean concurrent = supportsConcurrentUploadsToSamePath(); + + describe("testing concurrent uploads, MPU support for this is " + + concurrent); + final FileSystem fs = getFileSystem(); + final Path file = methodPath(); + final int size1 = SMALL_FILE; + final int partId1 = 1; + final byte[] payload1 = generatePayload(partId1, size1); + final MessageDigest digest1 = DigestUtils.getMd5Digest(); + digest1.update(payload1); + final UploadHandle upload1 = initializeUpload(file); + final Map partHandles1 = new HashMap<>(); + + // initiate part 2 + // by using a different size, it's straightforward to see which + // version is visible, before reading/digesting the contents + final int size2 = size1 * 2; + final int partId2 = 2; + final byte[] payload2 = generatePayload(partId1, size2); + final MessageDigest digest2 = DigestUtils.getMd5Digest(); + digest2.update(payload2); + + final UploadHandle upload2; + try { + upload2 = initializeUpload(file); + Assume.assumeTrue( + "The Filesystem is unexpectedly supporting concurrent uploads", + concurrent); + } catch (IOException e) { + if (!concurrent) { + // this is expected, so end the test + LOG.debug("Expected exception raised on concurrent uploads {}", e); + return; + } else { + throw e; + } + } + final Map partHandles2 = new HashMap<>(); + + + assertNotEquals("Upload handles match", upload1, upload2); + + // put part 1 + partHandles1.put(partId1, putPart(file, upload1, partId1, payload1)); + + // put part2 + partHandles2.put(partId2, putPart(file, upload2, partId2, payload2)); + + // complete part u1. expect its size and digest to + // be as expected. + completeUpload(file, upload1, partHandles1, digest1, size1); + + // now upload part 2. + complete(file, upload2, partHandles2); + // and await the visible length to match + eventually(timeToBecomeConsistentMillis(), 500, + () -> { + FileStatus status = fs.getFileStatus(file); + assertEquals("File length in " + status, + size2, status.getLen()); + }); + + verifyContents(file, digest2, size2); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java index a50d2e41b14..6e27964d8e3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java @@ -40,4 +40,14 @@ public class TestLocalFSContractMultipartUploader protected int partSizeInBytes() { return 1024; } + + @Override + protected boolean finalizeConsumesUploadIdImmediately() { + return true; + } + + @Override + protected boolean supportsConcurrentUploadsToSamePath() { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java index f3a5265de75..54f4ed27379 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java @@ -21,6 +21,8 @@ import java.io.IOException; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; @@ -32,6 +34,9 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; public class TestHDFSContractMultipartUploader extends AbstractContractMultipartUploaderTest { + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class); + @BeforeClass public static void createCluster() throws IOException { HDFSContract.createCluster(); @@ -55,4 +60,14 @@ public class TestHDFSContractMultipartUploader extends protected int partSizeInBytes() { return 1024; } + + @Override + protected boolean finalizeConsumesUploadIdImmediately() { + return true; + } + + @Override + protected boolean supportsConcurrentUploadsToSamePath() { + return true; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java index cab4e2a029e..cf58751ea44 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; @@ -68,10 +70,8 @@ public class S3AMultipartUploader extends MultipartUploader { public static final String HEADER = "S3A-part01"; public S3AMultipartUploader(FileSystem fs, Configuration conf) { - if (!(fs instanceof S3AFileSystem)) { - throw new IllegalArgumentException( - "S3A MultipartUploads must use S3AFileSystem"); - } + Preconditions.checkArgument(fs instanceof S3AFileSystem, + "Wrong filesystem: expected S3A but got %s", fs); s3a = (S3AFileSystem) fs; } @@ -88,6 +88,8 @@ public class S3AMultipartUploader extends MultipartUploader { public PartHandle putPart(Path filePath, InputStream inputStream, int partNumber, UploadHandle uploadId, long lengthInBytes) throws IOException { + checkPutArguments(filePath, inputStream, partNumber, uploadId, + lengthInBytes); byte[] uploadIdBytes = uploadId.toByteArray(); checkUploadId(uploadIdBytes); String key = s3a.pathToKey(filePath); @@ -105,14 +107,16 @@ public class S3AMultipartUploader extends MultipartUploader { @Override public PathHandle complete(Path filePath, - List> handles, UploadHandle uploadId) + Map handleMap, + UploadHandle uploadId) throws IOException { byte[] uploadIdBytes = uploadId.toByteArray(); checkUploadId(uploadIdBytes); - if (handles.isEmpty()) { - throw new IOException("Empty upload"); - } + checkPartHandles(handleMap); + List> handles = + new ArrayList<>(handleMap.entrySet()); + handles.sort(Comparator.comparingInt(Map.Entry::getKey)); final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); @@ -121,11 +125,11 @@ public class S3AMultipartUploader extends MultipartUploader { ArrayList eTags = new ArrayList<>(); eTags.ensureCapacity(handles.size()); long totalLength = 0; - for (Pair handle : handles) { - byte[] payload = handle.getRight().toByteArray(); + for (Map.Entry handle : handles) { + byte[] payload = handle.getValue().toByteArray(); Pair result = parsePartHandlePayload(payload); totalLength += result.getLeft(); - eTags.add(new PartETag(handle.getLeft(), result.getRight())); + eTags.add(new PartETag(handle.getKey(), result.getRight())); } AtomicInteger errorCount = new AtomicInteger(0); CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries( @@ -172,7 +176,7 @@ public class S3AMultipartUploader extends MultipartUploader { throws IOException { Preconditions.checkArgument(StringUtils.isNotEmpty(eTag), "Empty etag"); - Preconditions.checkArgument(len > 0, + Preconditions.checkArgument(len >= 0, "Invalid length"); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); @@ -190,6 +194,7 @@ public class S3AMultipartUploader extends MultipartUploader { * @return the length and etag * @throws IOException error reading the payload */ + @VisibleForTesting static Pair parsePartHandlePayload(byte[] data) throws IOException { @@ -201,7 +206,7 @@ public class S3AMultipartUploader extends MultipartUploader { } final long len = input.readLong(); final String etag = input.readUTF(); - if (len <= 0) { + if (len < 0) { throw new IOException("Negative length"); } return Pair.of(len, etag); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java index 6514ea3dcbc..059312a8103 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.fs.contract.s3a; -import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.WriteOperationHelper; @@ -35,6 +34,9 @@ import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_H /** * Test MultipartUploader with S3A. + * Although not an S3A Scale test subclass, it uses the -Dscale option + * to enable it, and partition size option to control the size of + * parts uploaded. */ public class ITestS3AContractMultipartUploader extends AbstractContractMultipartUploaderTest { @@ -79,6 +81,35 @@ public class ITestS3AContractMultipartUploader extends return new S3AContract(conf); } + /** + * Bigger test: use the scale timeout. + * @return the timeout for scale tests. + */ + @Override + protected int getTestTimeoutMillis() { + return SCALE_TEST_TIMEOUT_MILLIS; + } + + + @Override + protected boolean supportsConcurrentUploadsToSamePath() { + return true; + } + + /** + * Provide a pessimistic time to become consistent. + * @return a time in milliseconds + */ + @Override + protected int timeToBecomeConsistentMillis() { + return 30 * 1000; + } + + @Override + protected boolean finalizeConsumesUploadIdImmediately() { + return false; + } + @Override public void setup() throws Exception { super.setup(); @@ -103,19 +134,29 @@ public class ITestS3AContractMultipartUploader extends public void teardown() throws Exception { Path teardown = path("teardown").getParent(); S3AFileSystem fs = getFileSystem(); - WriteOperationHelper helper = fs.getWriteOperationHelper(); - try { - LOG.info("Teardown: aborting outstanding uploads under {}", teardown); - int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown)); - LOG.info("Found {} incomplete uploads", count); - } catch (IOException e) { - LOG.warn("IOE in teardown", e); + if (fs != null) { + WriteOperationHelper helper = fs.getWriteOperationHelper(); + try { + LOG.info("Teardown: aborting outstanding uploads under {}", teardown); + int count = helper.abortMultipartUploadsUnderPath( + fs.pathToKey(teardown)); + LOG.info("Found {} incomplete uploads", count); + } catch (Exception e) { + LOG.warn("Exeception in teardown", e); + } } super.teardown(); } + /** + * S3 has no concept of directories, so this test does not apply. + */ + public void testDirectoryInTheWay() throws Exception { + // no-op + } + @Override - public void testMultipartUploadEmptyPart() throws Exception { - // ignore the test in the base class. + public void testMultipartUploadReverseOrder() throws Exception { + ContractTestUtils.skip("skipped for speed"); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java index 35d04605262..4825d26eeb0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java @@ -60,7 +60,7 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase { @Test public void testNoLen() throws Throwable { intercept(IllegalArgumentException.class, - () -> buildPartHandlePayload("tag", 0)); + () -> buildPartHandlePayload("tag", -1)); } @Test