Move TransferManager initialization to ClientFactory

This commit is contained in:
Alessandro Passaro 2022-11-28 14:43:27 +00:00 committed by Ahmar Suhail
parent 1ab7e6827d
commit 0c2fd28252
4 changed files with 106 additions and 37 deletions

View File

@ -64,6 +64,7 @@ import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -123,9 +124,6 @@ public class DefaultS3ClientFactory extends Configured
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
/** Bucket name. */
private String bucket;
/**
* Create the client by preparing the AwsConf configuration
* and then invoking {@code buildAmazonS3Client()}.
@ -136,7 +134,7 @@ public class DefaultS3ClientFactory extends Configured
final URI uri,
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
bucket = uri.getHost();
String bucket = uri.getHost();
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(conf,
bucket,
@ -172,6 +170,7 @@ public class DefaultS3ClientFactory extends Configured
.equals(encryptionMethods.getMethod())) {
return buildAmazonS3EncryptionClient(
awsConf,
bucket,
parameters);
} else {
return buildAmazonS3Client(
@ -192,7 +191,7 @@ public class DefaultS3ClientFactory extends Configured
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
bucket = uri.getHost();
String bucket = uri.getHost();
ApacheHttpClient.Builder httpClientBuilder = AWSClientConfig
.createHttpClientBuilder(conf)
@ -208,7 +207,7 @@ public class DefaultS3ClientFactory extends Configured
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
bucket = uri.getHost();
String bucket = uri.getHost();
NettyNioAsyncHttpClient.Builder httpClientBuilder = AWSClientConfig
.createAsyncHttpClientBuilder(conf)
.proxyConfiguration(AWSClientConfig.createAsyncProxyConfiguration(conf, bucket));
@ -217,6 +216,26 @@ public class DefaultS3ClientFactory extends Configured
.build();
}
@Override
public S3TransferManager createS3TransferManager(
final URI uri,
final S3ClientCreationParameters parameters)
throws IOException {
Configuration conf = getConf();
String bucket = uri.getHost();
Region region = getS3Region(conf.getTrimmed(AWS_REGION), bucket,
parameters.getCredentialSet());
return S3TransferManager.builder()
.s3ClientConfiguration(clientConfiguration ->
clientConfiguration
.minimumPartSizeInBytes(parameters.getMinimumPartSize())
.credentialsProvider(parameters.getCredentialSet())
.region(region))
.transferConfiguration(transferConfiguration ->
transferConfiguration.executor(parameters.getTransferManagerExecutor()))
.build();
}
/**
* Configure a sync or async S3 client builder.
* This method handles all shared configuration.
@ -297,13 +316,14 @@ public class DefaultS3ClientFactory extends Configured
* {@link AmazonS3EncryptionV2} if CSE is enabled.
*
* @param awsConf AWS configuration.
* @param bucket bucket name.
* @param parameters parameters.
*
* @return new AmazonS3 client.
* @throws IOException if lookupPassword() has any problem.
*/
protected AmazonS3 buildAmazonS3EncryptionClient(
final ClientConfiguration awsConf,
final String bucket,
final S3ClientCreationParameters parameters) throws IOException {
AmazonS3 client;

View File

@ -995,17 +995,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
.withUserAgentSuffix(uaSuffix)
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withExecutionInterceptors(auditManager.createExecutionInterceptors());
.withExecutionInterceptors(auditManager.createExecutionInterceptors())
.withMinimumPartSize(partSize)
.withTransferManagerExecutor(unboundedThreadPool);
s3Client = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3ClientV2(getUri(),
parameters);
s3AsyncClient = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3AsyncClient(getUri(),
parameters);
initTransferManager();
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3ClientV2(getUri(), parameters);
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
transferManager = clientFactory.createS3TransferManager(getUri(), parameters);
}
/**
@ -1179,23 +1176,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return encryptionSecrets;
}
private void initTransferManager() {
// TODO: move to client factory?
transferManager = S3TransferManager.builder()
.s3ClientConfiguration(clientConfiguration ->
// TODO: Temporarily using EU_WEST_1 as the region, ultimately this can maybe moved to
// the DefaultS3ClientFactory and use the region resolution logic there. Wait till we
// finalise region logic before making any changes here. Also add other
// configuration options?
clientConfiguration
.minimumPartSizeInBytes(partSize)
.credentialsProvider(credentials)
.region(Region.EU_WEST_1))
.transferConfiguration(transferConfiguration ->
transferConfiguration.executor(unboundedThreadPool)) // TODO: double-check
.build();
}
private void initCannedAcls(Configuration conf) {
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) {

View File

@ -24,19 +24,20 @@ import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import com.amazonaws.monitoring.MonitoringListener;
import com.amazonaws.services.s3.AmazonS3;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import com.amazonaws.monitoring.MonitoringListener;
import com.amazonaws.services.s3.AmazonS3;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT;
/**
@ -93,6 +94,17 @@ public interface S3ClientFactory {
S3AsyncClient createS3AsyncClient(URI uri,
S3ClientCreationParameters parameters) throws IOException;
/**
* Creates a new {@link S3TransferManager}.
*
* @param uri S3A file system URI
* @param parameters parameter object
* @return S3 transfer manager
* @throws IOException on any IO problem
*/
S3TransferManager createS3TransferManager(URI uri,
S3ClientCreationParameters parameters) throws IOException;
/**
* Settings for the S3 Client.
* Implemented as a class to pass in so that adding
@ -154,6 +166,16 @@ public interface S3ClientFactory {
*/
private URI pathUri;
/**
* Minimum part size for transfer parts.
*/
private long minimumPartSize;
/**
* Executor that the transfer manager will use to execute background tasks.
*/
private Executor transferManagerExecutor;
/**
* List of execution interceptors to include in the chain
* of interceptors in the SDK.
@ -324,5 +346,43 @@ public interface S3ClientFactory {
pathUri = value;
return this;
}
/**
* Get the minimum part size for transfer parts.
* @return part size
*/
public long getMinimumPartSize() {
return minimumPartSize;
}
/**
* Set the minimum part size for transfer parts.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withMinimumPartSize(
final long value) {
minimumPartSize = value;
return this;
}
/**
* Get the executor that the transfer manager will use to execute background tasks.
* @return part size
*/
public Executor getTransferManagerExecutor() {
return transferManagerExecutor;
}
/**
* Set the executor that the transfer manager will use to execute background tasks.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withTransferManagerExecutor(
final Executor value) {
transferManagerExecutor = value;
return this;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@ -28,6 +29,7 @@ import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
/**
* An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
@ -64,4 +66,11 @@ public class MockS3ClientFactory implements S3ClientFactory {
S3AsyncClient s3 = mock(S3AsyncClient.class);
return s3;
}
@Override
public S3TransferManager createS3TransferManager(URI uri, S3ClientCreationParameters parameters)
throws IOException {
S3TransferManager tm = mock(S3TransferManager.class);
return tm;
}
}