HADOOP-9454. Support multipart uploads for s3native. Contributed by Jordan Mendelson and Akira AJISAKA.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1572237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3406b14755
commit
4812bd1569
|
@ -37,6 +37,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove
|
HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove
|
||||||
it in trunk. (Haohui Mai via jing9)
|
it in trunk. (Haohui Mai via jing9)
|
||||||
|
|
||||||
|
HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
|
||||||
|
Akira AJISAKA via atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -28,6 +28,9 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceException;
|
||||||
import org.jets3t.service.ServiceException;
|
import org.jets3t.service.ServiceException;
|
||||||
import org.jets3t.service.StorageObjectsChunk;
|
import org.jets3t.service.StorageObjectsChunk;
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||||
|
import org.jets3t.service.model.MultipartPart;
|
||||||
|
import org.jets3t.service.model.MultipartUpload;
|
||||||
import org.jets3t.service.model.S3Bucket;
|
import org.jets3t.service.model.S3Bucket;
|
||||||
import org.jets3t.service.model.S3Object;
|
import org.jets3t.service.model.S3Object;
|
||||||
import org.jets3t.service.model.StorageObject;
|
import org.jets3t.service.model.StorageObject;
|
||||||
import org.jets3t.service.security.AWSCredentials;
|
import org.jets3t.service.security.AWSCredentials;
|
||||||
|
import org.jets3t.service.utils.MultipartUtils;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
private S3Service s3Service;
|
private S3Service s3Service;
|
||||||
private S3Bucket bucket;
|
private S3Bucket bucket;
|
||||||
|
|
||||||
|
private long multipartBlockSize;
|
||||||
|
private boolean multipartEnabled;
|
||||||
|
private long multipartCopyBlockSize;
|
||||||
|
static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
|
||||||
|
|
||||||
public static final Log LOG =
|
public static final Log LOG =
|
||||||
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
|
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
|
||||||
|
|
||||||
|
@ -67,6 +79,15 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
} catch (S3ServiceException e) {
|
} catch (S3ServiceException e) {
|
||||||
handleS3ServiceException(e);
|
handleS3ServiceException(e);
|
||||||
}
|
}
|
||||||
|
multipartEnabled =
|
||||||
|
conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
|
||||||
|
multipartBlockSize = Math.min(
|
||||||
|
conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
|
||||||
|
MAX_PART_SIZE);
|
||||||
|
multipartCopyBlockSize = Math.min(
|
||||||
|
conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
|
||||||
|
MAX_PART_SIZE);
|
||||||
|
|
||||||
bucket = new S3Bucket(uri.getHost());
|
bucket = new S3Bucket(uri.getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +95,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
public void storeFile(String key, File file, byte[] md5Hash)
|
public void storeFile(String key, File file, byte[] md5Hash)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
if (multipartEnabled && file.length() >= multipartBlockSize) {
|
||||||
|
storeLargeFile(key, file, md5Hash);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
BufferedInputStream in = null;
|
BufferedInputStream in = null;
|
||||||
try {
|
try {
|
||||||
in = new BufferedInputStream(new FileInputStream(file));
|
in = new BufferedInputStream(new FileInputStream(file));
|
||||||
|
@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void storeLargeFile(String key, File file, byte[] md5Hash)
|
||||||
|
throws IOException {
|
||||||
|
S3Object object = new S3Object(key);
|
||||||
|
object.setDataInputFile(file);
|
||||||
|
object.setContentType("binary/octet-stream");
|
||||||
|
object.setContentLength(file.length());
|
||||||
|
if (md5Hash != null) {
|
||||||
|
object.setMd5Hash(md5Hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<StorageObject> objectsToUploadAsMultipart =
|
||||||
|
new ArrayList<StorageObject>();
|
||||||
|
objectsToUploadAsMultipart.add(object);
|
||||||
|
MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
|
||||||
|
|
||||||
|
try {
|
||||||
|
mpUtils.uploadObjects(bucket.getName(), s3Service,
|
||||||
|
objectsToUploadAsMultipart, null);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
handleServiceException(e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new S3Exception(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeEmptyFile(String key) throws IOException {
|
public void storeEmptyFile(String key) throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
}
|
}
|
||||||
S3Object object = s3Service.getObject(bucket.getName(), key);
|
S3Object object = s3Service.getObject(bucket.getName(), key);
|
||||||
return object.getDataInputStream();
|
return object.getDataInputStream();
|
||||||
} catch (S3ServiceException e) {
|
|
||||||
handleS3ServiceException(key, e);
|
|
||||||
return null; //never returned - keep compiler happy
|
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
handleServiceException(e);
|
handleServiceException(key, e);
|
||||||
return null; //return null if key not found
|
return null; //return null if key not found
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
S3Object object = s3Service.getObject(bucket, key, null, null, null,
|
S3Object object = s3Service.getObject(bucket, key, null, null, null,
|
||||||
null, byteRangeStart, null);
|
null, byteRangeStart, null);
|
||||||
return object.getDataInputStream();
|
return object.getDataInputStream();
|
||||||
} catch (S3ServiceException e) {
|
|
||||||
handleS3ServiceException(key, e);
|
|
||||||
return null; //never returned - keep compiler happy
|
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
handleServiceException(e);
|
handleServiceException(key, e);
|
||||||
return null; //return null if key not found
|
return null; //return null if key not found
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
|
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
|
||||||
}
|
}
|
||||||
s3Service.deleteObject(bucket, key);
|
s3Service.deleteObject(bucket, key);
|
||||||
} catch (S3ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
handleS3ServiceException(key, e);
|
handleServiceException(key, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void rename(String srcKey, String dstKey) throws IOException {
|
||||||
|
try {
|
||||||
|
s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
handleServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
|
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
|
||||||
}
|
}
|
||||||
|
if (multipartEnabled) {
|
||||||
|
S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
|
||||||
|
null, null, null);
|
||||||
|
if (multipartCopyBlockSize > 0 &&
|
||||||
|
object.getContentLength() > multipartCopyBlockSize) {
|
||||||
|
copyLargeFile(object, dstKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
|
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
|
||||||
new S3Object(dstKey), false);
|
new S3Object(dstKey), false);
|
||||||
} catch (S3ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
handleS3ServiceException(srcKey, e);
|
handleServiceException(srcKey, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
|
||||||
|
try {
|
||||||
|
long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
|
||||||
|
(srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
|
||||||
|
|
||||||
|
MultipartUpload multipartUpload = s3Service.multipartStartUpload
|
||||||
|
(bucket.getName(), dstKey, srcObject.getMetadataMap());
|
||||||
|
|
||||||
|
List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
|
||||||
|
for (int i = 0; i < partCount; i++) {
|
||||||
|
long byteRangeStart = i * multipartCopyBlockSize;
|
||||||
|
long byteLength;
|
||||||
|
if (i < partCount - 1) {
|
||||||
|
byteLength = multipartCopyBlockSize;
|
||||||
|
} else {
|
||||||
|
byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
|
||||||
|
if (byteLength == 0) {
|
||||||
|
byteLength = multipartCopyBlockSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MultipartPart copiedPart = s3Service.multipartUploadPartCopy
|
||||||
|
(multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
|
||||||
|
null, null, null, null, byteRangeStart,
|
||||||
|
byteRangeStart + byteLength - 1, null);
|
||||||
|
listedParts.add(copiedPart);
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.reverse(listedParts);
|
||||||
|
s3Service.multipartCompleteUpload(multipartUpload, listedParts);
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
handleServiceException(e);
|
handleServiceException(e);
|
||||||
}
|
}
|
||||||
|
@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
System.out.println(sb);
|
System.out.println(sb);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
|
private void handleServiceException(String key, ServiceException e) throws IOException {
|
||||||
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
|
if ("NoSuchKey".equals(e.getErrorCode())) {
|
||||||
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
|
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
|
||||||
} else {
|
} else {
|
||||||
handleS3ServiceException(e);
|
handleServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -524,6 +524,31 @@
|
||||||
filesystem (s3n: URIs).</description>
|
filesystem (s3n: URIs).</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3n.multipart.uploads.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Setting this property to true enables multiple uploads to
|
||||||
|
native S3 filesystem. When uploading a file, it is split into blocks
|
||||||
|
if the size is larger than fs.s3n.multipart.uploads.block.size.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3n.multipart.uploads.block.size</name>
|
||||||
|
<value>67108864</value>
|
||||||
|
<description>The block size for multipart uploads to native S3 filesystem.
|
||||||
|
Default size is 64MB.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3n.multipart.copy.block.size</name>
|
||||||
|
<value>5368709120</value>
|
||||||
|
<description>The block size for multipart copy in native S3 filesystem.
|
||||||
|
Default size is 5GB.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>io.seqfile.compress.blocksize</name>
|
<name>io.seqfile.compress.blocksize</name>
|
||||||
<value>1000000</value>
|
<value>1000000</value>
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3native;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.junit.Assume.*;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.security.DigestInputStream;
|
||||||
|
import java.security.DigestOutputStream;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestJets3tNativeFileSystemStore {
|
||||||
|
private Configuration conf;
|
||||||
|
private Jets3tNativeFileSystemStore store;
|
||||||
|
private NativeS3FileSystem fs;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
store = new Jets3tNativeFileSystemStore();
|
||||||
|
fs = new NativeS3FileSystem(store);
|
||||||
|
conf.setBoolean("fs.s3n.multipart.uploads.enabled", true);
|
||||||
|
conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024);
|
||||||
|
fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
try {
|
||||||
|
store.purge("test");
|
||||||
|
} catch (Exception e) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void checkSettings() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
assumeNotNull(conf.get("fs.s3n.awsAccessKeyId"));
|
||||||
|
assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey"));
|
||||||
|
assumeNotNull(conf.get("test.fs.s3n.name"));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void writeRenameReadCompare(Path path, long len)
|
||||||
|
throws IOException, NoSuchAlgorithmException {
|
||||||
|
// If len > fs.s3n.multipart.uploads.block.size,
|
||||||
|
// we'll use a multipart upload copy
|
||||||
|
MessageDigest digest = MessageDigest.getInstance("MD5");
|
||||||
|
OutputStream out = new BufferedOutputStream(
|
||||||
|
new DigestOutputStream(fs.create(path, false), digest));
|
||||||
|
for (long i = 0; i < len; i++) {
|
||||||
|
out.write('Q');
|
||||||
|
}
|
||||||
|
out.flush();
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
assertTrue("Exists", fs.exists(path));
|
||||||
|
|
||||||
|
// Depending on if this file is over 5 GB or not,
|
||||||
|
// rename will cause a multipart upload copy
|
||||||
|
Path copyPath = path.suffix(".copy");
|
||||||
|
fs.rename(path, copyPath);
|
||||||
|
|
||||||
|
assertTrue("Copy exists", fs.exists(copyPath));
|
||||||
|
|
||||||
|
// Download file from S3 and compare the digest against the original
|
||||||
|
MessageDigest digest2 = MessageDigest.getInstance("MD5");
|
||||||
|
InputStream in = new BufferedInputStream(
|
||||||
|
new DigestInputStream(fs.open(copyPath), digest2));
|
||||||
|
long copyLen = 0;
|
||||||
|
while (in.read() != -1) {copyLen++;}
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
assertEquals("Copy length matches original", len, copyLen);
|
||||||
|
assertArrayEquals("Digests match", digest.digest(), digest2.digest());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
|
||||||
|
// Regular upload, regular copy
|
||||||
|
writeRenameReadCompare(new Path("/test/small"), 16384);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
|
||||||
|
// Multipart upload, regular copy
|
||||||
|
writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtraLargeUpload()
|
||||||
|
throws IOException, NoSuchAlgorithmException {
|
||||||
|
// Multipart upload, multipart copy
|
||||||
|
writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# Speed up the s3native jets3t test
|
||||||
|
|
||||||
|
s3service.max-thread-count=10
|
||||||
|
threaded-service.max-thread-count=10
|
Loading…
Reference in New Issue