HADOOP-11171 Enable using a proxy server to connect to S3a. (Thomas Demoor via stevel)
This commit is contained in:
parent
ed2f8a1ba0
commit
15865587ec
|
@ -127,6 +127,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HADOOP-11261 Set custom endpoint for S3A. (Thomas Demoor via stevel)
|
HADOOP-11261 Set custom endpoint for S3A. (Thomas Demoor via stevel)
|
||||||
|
|
||||||
|
HADOOP-11171 Enable using a proxy server to connect to S3a.
|
||||||
|
(Thomas Demoor via stevel)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
||||||
|
|
|
@ -31,6 +31,13 @@ public class Constants {
|
||||||
|
|
||||||
//use a custom endpoint?
|
//use a custom endpoint?
|
||||||
public static final String ENDPOINT = "fs.s3a.endpoint";
|
public static final String ENDPOINT = "fs.s3a.endpoint";
|
||||||
|
//connect to s3 through a proxy server?
|
||||||
|
public static final String PROXY_HOST = "fs.s3a.proxy.host";
|
||||||
|
public static final String PROXY_PORT = "fs.s3a.proxy.port";
|
||||||
|
public static final String PROXY_USERNAME = "fs.s3a.proxy.username";
|
||||||
|
public static final String PROXY_PASSWORD = "fs.s3a.proxy.password";
|
||||||
|
public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
|
||||||
|
public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";
|
||||||
|
|
||||||
// number of times we should retry errors
|
// number of times we should retry errors
|
||||||
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
|
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
|
||||||
|
|
|
@ -169,13 +169,54 @@ public class S3AFileSystem extends FileSystem {
|
||||||
ClientConfiguration awsConf = new ClientConfiguration();
|
ClientConfiguration awsConf = new ClientConfiguration();
|
||||||
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
||||||
DEFAULT_MAXIMUM_CONNECTIONS));
|
DEFAULT_MAXIMUM_CONNECTIONS));
|
||||||
awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS,
|
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||||
DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP);
|
DEFAULT_SECURE_CONNECTIONS);
|
||||||
|
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||||
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
||||||
DEFAULT_MAX_ERROR_RETRIES));
|
DEFAULT_MAX_ERROR_RETRIES));
|
||||||
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
||||||
DEFAULT_SOCKET_TIMEOUT));
|
DEFAULT_SOCKET_TIMEOUT));
|
||||||
|
|
||||||
|
String proxyHost = conf.getTrimmed(PROXY_HOST,"");
|
||||||
|
int proxyPort = conf.getInt(PROXY_PORT, -1);
|
||||||
|
if (!proxyHost.isEmpty()) {
|
||||||
|
awsConf.setProxyHost(proxyHost);
|
||||||
|
if (proxyPort >= 0) {
|
||||||
|
awsConf.setProxyPort(proxyPort);
|
||||||
|
} else {
|
||||||
|
if (secureConnections) {
|
||||||
|
LOG.warn("Proxy host set without port. Using HTTPS default 443");
|
||||||
|
awsConf.setProxyPort(443);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Proxy host set without port. Using HTTP default 80");
|
||||||
|
awsConf.setProxyPort(80);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
|
||||||
|
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
|
||||||
|
if ((proxyUsername == null) != (proxyPassword == null)) {
|
||||||
|
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
|
||||||
|
PROXY_PASSWORD + " set without the other.";
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
awsConf.setProxyUsername(proxyUsername);
|
||||||
|
awsConf.setProxyPassword(proxyPassword);
|
||||||
|
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
|
||||||
|
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
|
||||||
|
"domain {} as workstation {}", awsConf.getProxyHost(),
|
||||||
|
awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()),
|
||||||
|
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
|
||||||
|
awsConf.getProxyWorkstation());
|
||||||
|
}
|
||||||
|
} else if (proxyPort >= 0) {
|
||||||
|
String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
s3 = new AmazonS3Client(credentials, awsConf);
|
s3 = new AmazonS3Client(credentials, awsConf);
|
||||||
String endPoint = conf.getTrimmed(ENDPOINT,"");
|
String endPoint = conf.getTrimmed(ENDPOINT,"");
|
||||||
if (!endPoint.isEmpty()) {
|
if (!endPoint.isEmpty()) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import com.amazonaws.AmazonClientException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -82,4 +83,98 @@ public class TestS3AConfiguration {
|
||||||
endPointRegion, s3.getBucketLocation(fs.getUri().getHost()));
|
endPointRegion, s3.getBucketLocation(fs.getUri().getHost()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestProxyConnection() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
|
||||||
|
conf.set(Constants.PROXY_HOST, "127.0.0.1");
|
||||||
|
conf.setInt(Constants.PROXY_PORT, 1);
|
||||||
|
String proxy =
|
||||||
|
conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a connection error for proxy server at " + proxy);
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
if (!e.getMessage().contains(proxy + " refused")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestProxyPortWithoutHost() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
|
||||||
|
conf.setInt(Constants.PROXY_PORT, 1);
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a proxy configuration error");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
String msg = e.toString();
|
||||||
|
if (!msg.contains(Constants.PROXY_HOST) &&
|
||||||
|
!msg.contains(Constants.PROXY_PORT)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestAutomaticProxyPortSelection() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
|
||||||
|
conf.set(Constants.PROXY_HOST, "127.0.0.1");
|
||||||
|
conf.set(Constants.SECURE_CONNECTIONS, "true");
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a connection error for proxy server");
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
if (!e.getMessage().contains("443")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conf.set(Constants.SECURE_CONNECTIONS, "false");
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a connection error for proxy server");
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
if (!e.getMessage().contains("80")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestUsernameInconsistentWithPassword() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
|
||||||
|
conf.set(Constants.PROXY_HOST, "127.0.0.1");
|
||||||
|
conf.setInt(Constants.PROXY_PORT, 1);
|
||||||
|
conf.set(Constants.PROXY_USERNAME, "user");
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a connection error for proxy server");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
String msg = e.toString();
|
||||||
|
if (!msg.contains(Constants.PROXY_USERNAME) &&
|
||||||
|
!msg.contains(Constants.PROXY_PASSWORD)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
|
||||||
|
conf.set(Constants.PROXY_HOST, "127.0.0.1");
|
||||||
|
conf.setInt(Constants.PROXY_PORT, 1);
|
||||||
|
conf.set(Constants.PROXY_PASSWORD, "password");
|
||||||
|
try {
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
fail("Expected a connection error for proxy server");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
String msg = e.toString();
|
||||||
|
if (!msg.contains(Constants.PROXY_USERNAME) &&
|
||||||
|
!msg.contains(Constants.PROXY_PASSWORD)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue