HADOOP-17851. S3A to support user-specified content encoding (#3498)

The option fs.s3a.object.content.encoding declares the content encoding to be set on files when they are written; this is served up in the "Content-Encoding" HTTP header when reading objects back in.

This is useful for people loading the data into other tools in the AWS ecosystem which don't use file extensions to infer compression type (e.g. serving compressed files from S3 or importing into RDS)

Contributed by: Holden Karau

Change-Id: Ice0da75b516370f51f79e45f391d46c5c7aa4ce4
This commit is contained in:
Steve Loughran 2021-09-29 13:42:07 +01:00
parent 55f4baf3e4
commit ebdd8771c5
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
7 changed files with 171 additions and 5 deletions

View File

@ -413,6 +413,12 @@ public final class Constants {
public static final String CANNED_ACL = "fs.s3a.acl.default";
public static final String DEFAULT_CANNED_ACL = "";
/**
* Content encoding: gzip, deflate, compress, br, etc.
* Value {@value}.
*/
public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding";
// should we try to purge old multipart uploads when starting up
public static final String PURGE_EXISTING_MULTIPART =
"fs.s3a.multipart.purge";

View File

@ -937,12 +937,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// request factory.
initCannedAcls(getConf());
// Any encoding type
String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL())
.withEncryptionSecrets(requireNonNull(encryptionSecrets))
.withMultipartPartCountLimit(partCountLimit)
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.build();
}

View File

@ -100,6 +100,12 @@ public interface RequestFactory {
*/
S3AEncryptionMethods getServerSideEncryptionAlgorithm();
/**
* Get the content encoding (e.g. gzip) or return null if none.
* @return content encoding
*/
String getContentEncoding();
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:

View File

@ -83,7 +83,7 @@ public class HeaderProcessing extends AbstractStoreOperation {
XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
/**
* Standard HTTP header found on some S3 objects: {@value}.
* Content encoding; can be configured: {@value}.
*/
public static final String XA_CONTENT_ENCODING =
XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;

View File

@ -118,6 +118,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final PrepareRequest requestPreparer;
/**
* Content encoding (null for none).
*/
private final String contentEncoding;
/**
* Constructor.
* @param builder builder with all the configuration.
@ -130,6 +135,7 @@ public class RequestFactoryImpl implements RequestFactory {
this.multipartPartCountLimit = builder.multipartPartCountLimit;
this.requesterPays = builder.requesterPays;
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
}
/**
@ -193,6 +199,15 @@ public class RequestFactoryImpl implements RequestFactory {
return encryptionSecrets.getEncryptionMethod();
}
/**
* Get the content encoding (e.g. gzip) or return null if none.
* @return content encoding
*/
@Override
public String getContentEncoding() {
return contentEncoding;
}
/**
* Sets server side encryption parameters to the part upload
* request when encryption is enabled.
@ -236,13 +251,18 @@ public class RequestFactoryImpl implements RequestFactory {
/**
* Set the optional metadata for an object being created or copied.
* @param metadata to update.
* @param isDirectoryMarker is this for a directory marker?
*/
protected void setOptionalObjectMetadata(ObjectMetadata metadata) {
protected void setOptionalObjectMetadata(ObjectMetadata metadata,
boolean isDirectoryMarker) {
final S3AEncryptionMethods algorithm
= getServerSideEncryptionAlgorithm();
if (S3AEncryptionMethods.SSE_S3 == algorithm) {
metadata.setSSEAlgorithm(algorithm.getMethod());
}
if (contentEncoding != null && !isDirectoryMarker) {
metadata.setContentEncoding(contentEncoding);
}
}
/**
@ -255,8 +275,21 @@ public class RequestFactoryImpl implements RequestFactory {
*/
@Override
public ObjectMetadata newObjectMetadata(long length) {
return createObjectMetadata(length, false);
}
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
*
* @param length length of data to set in header; Ignored if negative
* @param isDirectoryMarker is this for a directory marker?
* @return a new metadata instance
*/
private ObjectMetadata createObjectMetadata(long length, boolean isDirectoryMarker) {
final ObjectMetadata om = new ObjectMetadata();
setOptionalObjectMetadata(om);
setOptionalObjectMetadata(om, isDirectoryMarker);
if (length >= 0) {
om.setContentLength(length);
}
@ -271,7 +304,7 @@ public class RequestFactoryImpl implements RequestFactory {
new CopyObjectRequest(getBucket(), srcKey, getBucket(), dstKey);
ObjectMetadata dstom = newObjectMetadata(srcom.getContentLength());
HeaderProcessing.cloneObjectMetadata(srcom, dstom);
setOptionalObjectMetadata(dstom);
setOptionalObjectMetadata(dstom, false);
copyEncryptionParameters(srcom, copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
@ -371,7 +404,7 @@ public class RequestFactoryImpl implements RequestFactory {
}
};
// preparation happens in here
final ObjectMetadata md = newObjectMetadata(0L);
final ObjectMetadata md = createObjectMetadata(0L, true);
md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
PutObjectRequest putObjectRequest =
newPutObjectRequest(key, md, im);
@ -585,6 +618,9 @@ public class RequestFactoryImpl implements RequestFactory {
/** Requester Pays flag. */
private boolean requesterPays = false;
/** Content Encoding. */
private String contentEncoding;
/**
* Multipart limit.
*/
@ -606,6 +642,16 @@ public class RequestFactoryImpl implements RequestFactory {
return new RequestFactoryImpl(this);
}
/**
* Content encoding.
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withContentEncoding(final String value) {
contentEncoding = value;
return this;
}
/**
* Target bucket.
* @param value new value

View File

@ -1057,6 +1057,17 @@ options are covered in [Testing](./testing.md).
client has permission to read the bucket.
</description>
</property>
<property>
<name>fs.s3a.object.content.encoding</name>
<value></value>
<description>
Content encoding: gzip, deflate, compress, br, etc.
This will be set in the "Content-Encoding" header of the object,
and returned in HTTP HEAD/GET requests.
</description>
</property>
```
## <a name="retry_and_recovery"></a>Retry and Recovery

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.CONTENT_ENCODING;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_ENCODING;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
/**
* Tests of content encoding object meta data.
*/
public class ITestS3AContentEncoding extends AbstractS3ATestBase {
private static final String GZIP = "gzip";
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, CONTENT_ENCODING);
conf.set(CONTENT_ENCODING, GZIP);
return conf;
}
@Test
public void testCreatedObjectsHaveEncoding() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with content encoding enabled, directories do not have
// encoding.
Assertions.assertThat(getEncoding(dir))
.describedAs("Encoding of object %s", dir)
.isNull();
Path path = new Path(dir, "1");
ContractTestUtils.touch(fs, path);
assertObjectHasEncoding(path);
Path path2 = new Path(dir, "2");
fs.rename(path, path2);
assertObjectHasEncoding(path2);
}
/**
* Assert that a given object has gzip encoding specified.
* @param path path
*
*/
private void assertObjectHasEncoding(Path path) throws Throwable {
Assertions.assertThat(getEncoding(path))
.describedAs("Encoding of object %s", path)
.isEqualTo(GZIP);
}
/**
* Get the encoding of a path.
* @param path path
* @return encoding string or null
* @throws IOException IO Failure.
*/
private String getEncoding(Path path) throws IOException {
S3AFileSystem fs = getFileSystem();
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
return decodeBytes(xAttrs.get(XA_CONTENT_ENCODING));
}
}