HADOOP-18339. S3A storage class option only picked up when buffering writes to disk. (#4669)
Follow-up to HADOOP-12020 Support configuration of different S3 storage classes; S3 storage class is now set when buffering to heap/bytebuffers, and when creating directory markers Contributed by Monthon Klongklaew
This commit is contained in:
parent
3a6c8ff8bb
commit
9dffa65021
|
@ -408,6 +408,9 @@ public class RequestFactoryImpl implements RequestFactory {
|
||||||
inputStream, metadata);
|
inputStream, metadata);
|
||||||
setOptionalPutRequestParameters(putObjectRequest);
|
setOptionalPutRequestParameters(putObjectRequest);
|
||||||
putObjectRequest.setCannedAcl(cannedACL);
|
putObjectRequest.setCannedAcl(cannedACL);
|
||||||
|
if (storageClass != null) {
|
||||||
|
putObjectRequest.setStorageClass(storageClass);
|
||||||
|
}
|
||||||
return prepareRequest(putObjectRequest);
|
return prepareRequest(putObjectRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,19 +419,22 @@ public class RequestFactoryImpl implements RequestFactory {
|
||||||
String key = directory.endsWith("/")
|
String key = directory.endsWith("/")
|
||||||
? directory
|
? directory
|
||||||
: (directory + "/");
|
: (directory + "/");
|
||||||
// an input stream which is laways empty
|
// an input stream which is always empty
|
||||||
final InputStream im = new InputStream() {
|
final InputStream inputStream = new InputStream() {
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// preparation happens in here
|
// preparation happens in here
|
||||||
final ObjectMetadata md = createObjectMetadata(0L, true);
|
final ObjectMetadata metadata = createObjectMetadata(0L, true);
|
||||||
md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
|
metadata.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
|
||||||
PutObjectRequest putObjectRequest =
|
|
||||||
newPutObjectRequest(key, md, null, im);
|
PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
|
||||||
return putObjectRequest;
|
inputStream, metadata);
|
||||||
|
setOptionalPutRequestParameters(putObjectRequest);
|
||||||
|
putObjectRequest.setCannedAcl(cannedACL);
|
||||||
|
return prepareRequest(putObjectRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,10 +19,14 @@
|
||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import java.nio.file.AccessDeniedException;
|
import java.nio.file.AccessDeniedException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -30,6 +34,10 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
|
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER;
|
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY;
|
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY;
|
||||||
|
@ -43,13 +51,33 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
/**
|
/**
|
||||||
* Tests of storage class.
|
* Tests of storage class.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class ITestS3AStorageClass extends AbstractS3ATestBase {
|
public class ITestS3AStorageClass extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HADOOP-18339. Parameterized the test for different fast upload buffer types
|
||||||
|
* to ensure the storage class configuration works with all of them.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameters(name = "fast-upload-buffer-{0}")
|
||||||
|
public static Collection<Object[]> params() {
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{FAST_UPLOAD_BUFFER_DISK},
|
||||||
|
{FAST_UPLOAD_BUFFER_ARRAY}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String fastUploadBufferType;
|
||||||
|
|
||||||
|
public ITestS3AStorageClass(String fastUploadBufferType) {
|
||||||
|
this.fastUploadBufferType = fastUploadBufferType;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Configuration createConfiguration() {
|
protected Configuration createConfiguration() {
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
disableFilesystemCaching(conf);
|
disableFilesystemCaching(conf);
|
||||||
removeBaseAndBucketOverrides(conf, STORAGE_CLASS);
|
removeBaseAndBucketOverrides(conf, STORAGE_CLASS, FAST_UPLOAD_BUFFER);
|
||||||
|
conf.set(FAST_UPLOAD_BUFFER, fastUploadBufferType);
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue