diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 96d0464b651..810c05670f4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -37,6 +37,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove it in trunk. (Haohui Mai via jing9) + HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and + Akira AJISAKA via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java index 49266187057..f6a88338fe0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java @@ -28,6 +28,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.MultipartPart; +import org.jets3t.service.model.MultipartUpload; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; import org.jets3t.service.model.StorageObject; import org.jets3t.service.security.AWSCredentials; +import org.jets3t.service.utils.MultipartUtils; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { private S3Service s3Service; private S3Bucket bucket; + + private long multipartBlockSize; + private boolean multipartEnabled; + private long multipartCopyBlockSize; + static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024; + public static final Log LOG = LogFactory.getLog(Jets3tNativeFileSystemStore.class); @@ -67,13 +79,27 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } catch (S3ServiceException e) { handleS3ServiceException(e); } + multipartEnabled = + conf.getBoolean("fs.s3n.multipart.uploads.enabled", false); + multipartBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024), + MAX_PART_SIZE); + multipartCopyBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE), + MAX_PART_SIZE); + bucket = new S3Bucket(uri.getHost()); } @Override public void storeFile(String key, File file, byte[] md5Hash) throws IOException { - + + if (multipartEnabled && file.length() >= multipartBlockSize) { + storeLargeFile(key, file, md5Hash); + return; + } + BufferedInputStream in = null; try { in = new BufferedInputStream(new FileInputStream(file)); @@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } } + public void storeLargeFile(String key, File file, byte[] md5Hash) + throws IOException { + S3Object object = new S3Object(key); + object.setDataInputFile(file); + object.setContentType("binary/octet-stream"); + object.setContentLength(file.length()); + if (md5Hash != null) { + object.setMd5Hash(md5Hash); + } + + List objectsToUploadAsMultipart = + new ArrayList(); + objectsToUploadAsMultipart.add(object); + MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize); + + try { + mpUtils.uploadObjects(bucket.getName(), s3Service, + objectsToUploadAsMultipart, null); + } catch (ServiceException e) { + handleServiceException(e); + } catch (Exception e) { + throw new S3Exception(e); + } + } + @Override public void storeEmptyFile(String key) throws IOException { try { @@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } S3Object object = s3Service.getObject(bucket.getName(), key); return object.getDataInputStream(); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); - return null; //never returned - keep compiler happy } catch (ServiceException e) { - handleServiceException(e); + handleServiceException(key, e); return null; //return null if key not found } } @@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { S3Object object = s3Service.getObject(bucket, key, null, null, null, null, byteRangeStart, null); return object.getDataInputStream(); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); - return null; //never returned - keep compiler happy } catch (ServiceException e) { - handleServiceException(e); + handleServiceException(key, e); return null; //return null if key not found } } @@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName()); } s3Service.deleteObject(bucket, key); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); + } catch (ServiceException e) { + handleServiceException(key, e); + } + } + + public void rename(String srcKey, String dstKey) throws IOException { + try { + s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey)); + } catch (ServiceException e) { + handleServiceException(e); } } @@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { if(LOG.isDebugEnabled()) { LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName()); } + if (multipartEnabled) { + S3Object object = s3Service.getObjectDetails(bucket, srcKey, null, + null, null, null); + if (multipartCopyBlockSize > 0 && + object.getContentLength() > multipartCopyBlockSize) { + copyLargeFile(object, dstKey); + return; + } + } s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(), new S3Object(dstKey), false); - } catch (S3ServiceException e) { - handleS3ServiceException(srcKey, e); + } catch (ServiceException e) { + handleServiceException(srcKey, e); + } + } + + public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException { + try { + long partCount = srcObject.getContentLength() / multipartCopyBlockSize + + (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0); + + MultipartUpload multipartUpload = s3Service.multipartStartUpload + (bucket.getName(), dstKey, srcObject.getMetadataMap()); + + List listedParts = new ArrayList(); + for (int i = 0; i < partCount; i++) { + long byteRangeStart = i * multipartCopyBlockSize; + long byteLength; + if (i < partCount - 1) { + byteLength = multipartCopyBlockSize; + } else { + byteLength = srcObject.getContentLength() % multipartCopyBlockSize; + if (byteLength == 0) { + byteLength = multipartCopyBlockSize; + } + } + + MultipartPart copiedPart = s3Service.multipartUploadPartCopy + (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(), + null, null, null, null, byteRangeStart, + byteRangeStart + byteLength - 1, null); + listedParts.add(copiedPart); + } + + Collections.reverse(listedParts); + s3Service.multipartCompleteUpload(multipartUpload, listedParts); } catch (ServiceException e) { handleServiceException(e); } @@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { System.out.println(sb); } - private void handleS3ServiceException(String key, S3ServiceException e) throws IOException { - if ("NoSuchKey".equals(e.getS3ErrorCode())) { + private void handleServiceException(String key, ServiceException e) throws IOException { + if ("NoSuchKey".equals(e.getErrorCode())) { throw new FileNotFoundException("Key '" + key + "' does not exist in S3"); } else { - handleS3ServiceException(e); + handleServiceException(e); } } diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 16db6f9773e..c1abc8163c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -524,6 +524,31 @@ filesystem (s3n: URIs). + + fs.s3n.multipart.uploads.enabled + false + Setting this property to true enables multiple uploads to + native S3 filesystem. When uploading a file, it is split into blocks + if the size is larger than fs.s3n.multipart.uploads.block.size. + + + + + fs.s3n.multipart.uploads.block.size + 67108864 + The block size for multipart uploads to native S3 filesystem. + Default size is 64MB. + + + + + fs.s3n.multipart.copy.block.size + 5368709120 + The block size for multipart copy in native S3 filesystem. + Default size is 5GB. + + + io.seqfile.compress.blocksize 1000000 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java new file mode 100644 index 00000000000..b1078a45144 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + + +public class TestJets3tNativeFileSystemStore { + private Configuration conf; + private Jets3tNativeFileSystemStore store; + private NativeS3FileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + store = new Jets3tNativeFileSystemStore(); + fs = new NativeS3FileSystem(store); + conf.setBoolean("fs.s3n.multipart.uploads.enabled", true); + conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024); + fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf); + } + + @After + public void tearDown() throws Exception { + try { + store.purge("test"); + } catch (Exception e) {} + } + + @BeforeClass + public static void checkSettings() throws Exception { + Configuration conf = new Configuration(); + assumeNotNull(conf.get("fs.s3n.awsAccessKeyId")); + assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey")); + assumeNotNull(conf.get("test.fs.s3n.name")); + } + + protected void writeRenameReadCompare(Path path, long len) + throws IOException, NoSuchAlgorithmException { + // If len > fs.s3n.multipart.uploads.block.size, + // we'll use a multipart upload copy + MessageDigest digest = MessageDigest.getInstance("MD5"); + OutputStream out = new BufferedOutputStream( + new DigestOutputStream(fs.create(path, false), digest)); + for (long i = 0; i < len; i++) { + out.write('Q'); + } + out.flush(); + out.close(); + + assertTrue("Exists", fs.exists(path)); + + // Depending on if this file is over 5 GB or not, + // rename will cause a multipart upload copy + Path copyPath = path.suffix(".copy"); + fs.rename(path, copyPath); + + assertTrue("Copy exists", fs.exists(copyPath)); + + // Download file from S3 and compare the digest against the original + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + InputStream in = new BufferedInputStream( + new DigestInputStream(fs.open(copyPath), digest2)); + long copyLen = 0; + while (in.read() != -1) {copyLen++;} + in.close(); + + assertEquals("Copy length matches original", len, copyLen); + assertArrayEquals("Digests match", digest.digest(), digest2.digest()); + } + + @Test + public void testSmallUpload() throws IOException, NoSuchAlgorithmException { + // Regular upload, regular copy + writeRenameReadCompare(new Path("/test/small"), 16384); + } + + @Test + public void testMediumUpload() throws IOException, NoSuchAlgorithmException { + // Multipart upload, regular copy + writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB + } + + @Test + public void testExtraLargeUpload() + throws IOException, NoSuchAlgorithmException { + // Multipart upload, multipart copy + writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties b/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties new file mode 100644 index 00000000000..09cc46396ab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties @@ -0,0 +1,16 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Speed up the s3native jets3t test + +s3service.max-thread-count=10 +threaded-service.max-thread-count=10