HADOOP-15669. ABFS: Improve HTTPS Performance.

Contributed by Vishwajeet Dusane.
This commit is contained in:
Thomas Marquardt 2018-08-16 16:19:13 +00:00
parent cc5cc60c41
commit d6a4f39bd5
10 changed files with 375 additions and 28 deletions

View File

@ -1349,6 +1349,13 @@
<version>7.0.0</version>
</dependency>
<!--Wildfly openssl dependency is introduced by HADOOP-15669-->
<dependency>
<groupId>org.wildfly.openssl</groupId>
<artifactId>wildfly-openssl</artifactId>
<version>1.0.4.Final</version>
</dependency>
<dependency>
<groupId>org.threadly</groupId>
<artifactId>threadly</artifactId>

View File

@ -197,13 +197,18 @@
<artifactId>jackson-mapper-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.wildfly.openssl</groupId>
<artifactId>wildfly-openssl</artifactId>
<scope>compile</scope>
</dependency>
<!-- dependencies use for test only -->
<dependency>

View File

@ -44,6 +44,10 @@ import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE;
/**
* Configuration for Azure Blob FileSystem.
@ -270,6 +274,10 @@ public class AbfsConfiguration{
return this.userAgentId;
}
public SSLSocketFactoryEx.SSLChannelMode getPreferredSSLFactoryOption() {
return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
}
void validateStorageAccountKeys() throws InvalidConfigurationValueException {
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);

View File

@ -55,6 +55,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs.constants;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
/**
* Responsible to keep all the Azure Blob File System related configurations.
@ -57,5 +58,8 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= SSLSocketFactoryEx.SSLChannelMode.Default;
private FileSystemConfigurations() {}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
@ -27,6 +28,7 @@ import java.util.List;
import java.util.Locale;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,6 +37,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
@ -60,7 +63,19 @@ public class AbfsClient {
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = exponentialRetryPolicy;
this.userAgent = initializeUserAgent(abfsConfiguration);
String sslProviderName = null;
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
try {
SSLSocketFactoryEx.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();
} catch (IOException e) {
// Suppress exception. Failure to init SSLSocketFactoryEx would have only performance impact.
}
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
}
public String getFileSystem() {
@ -395,16 +410,26 @@ public class AbfsClient {
}
@VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration) {
final String userAgentComment = String.format(Locale.ROOT,
"(JavaJRE %s; %s %s)",
System.getProperty(JAVA_VERSION),
System.getProperty(OS_NAME)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING),
System.getProperty(OS_VERSION));
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
final String sslProviderName) {
StringBuilder sb = new StringBuilder();
sb.append("(JavaJRE ");
sb.append(System.getProperty(JAVA_VERSION));
sb.append("; ");
sb.append(
System.getProperty(OS_NAME).replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
sb.append(" ");
sb.append(System.getProperty(OS_VERSION));
if (sslProviderName != null && !sslProviderName.isEmpty()) {
sb.append("; ");
sb.append(sslProviderName);
}
sb.append(")");
final String userAgentComment = sb.toString();
String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix();
if (customUserAgentId != null && !customUserAgentId.isEmpty()) {
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId);
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s",
userAgentComment, customUserAgentId);
}
return String.format(CLIENT_VERSION + " %s", userAgentComment);
}

View File

@ -26,6 +26,10 @@ import java.net.URL;
import java.util.List;
import java.util.UUID;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
@ -174,6 +178,13 @@ public class AbfsHttpOperation {
this.clientRequestId = UUID.randomUUID().toString();
this.connection = openConnection();
if (this.connection instanceof HttpsURLConnection) {
HttpsURLConnection secureConn = (HttpsURLConnection) this.connection;
SSLSocketFactory sslSocketFactory = SSLSocketFactoryEx.getDefaultFactory();
if (sslSocketFactory != null) {
secureConn.setSSLSocketFactory(sslSocketFactory);
}
}
this.connection.setConnectTimeout(CONNECT_TIMEOUT);
this.connection.setReadTimeout(READ_TIMEOUT);

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wildfly.openssl.OpenSSLProvider;
/**
* Extension to use native OpenSSL library instead of JSSE for better
* performance.
*
*/
public class SSLSocketFactoryEx extends SSLSocketFactory {
/**
* Default indicates Ordered, preferred OpenSSL, if failed to load then fall
* back to Default_JSSE
*/
public enum SSLChannelMode {
OpenSSL,
Default,
Default_JSSE
}
private static SSLSocketFactoryEx instance = null;
private static final Logger LOG = LoggerFactory.getLogger(
SSLSocketFactoryEx.class);
private String providerName;
private SSLContext ctx;
private String[] ciphers;
private SSLChannelMode channelMode;
/**
* Initialize a singleton SSL socket factory.
*
* @param preferredMode applicable only if the instance is not initialized.
* @throws IOException
*/
public synchronized static void initializeDefaultFactory(
SSLChannelMode preferredMode) throws IOException {
if (instance == null) {
instance = new SSLSocketFactoryEx(preferredMode);
}
}
/**
* Singletone instance of the SSLSocketFactory.
*
* SSLSocketFactory must be initialized with appropriate SSLChannelMode
* using initializeDefaultFactory method.
*
* @return instance of the SSLSocketFactory, instance must be initialized by
* initializeDefaultFactory.
*/
public static SSLSocketFactoryEx getDefaultFactory() {
return instance;
}
static {
OpenSSLProvider.register();
}
private SSLSocketFactoryEx(SSLChannelMode preferredChannelMode)
throws IOException {
try {
initializeSSLContext(preferredChannelMode);
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
} catch (KeyManagementException e) {
throw new IOException(e);
}
// Get list of supported cipher suits from the SSL factory.
SSLSocketFactory factory = ctx.getSocketFactory();
String[] defaultCiphers = factory.getSupportedCipherSuites();
String version = System.getProperty("java.version");
ciphers = (channelMode == SSLChannelMode.Default_JSSE
&& version.startsWith("1.8"))
? alterCipherList(defaultCiphers) : defaultCiphers;
providerName = ctx.getProvider().getName() + "-"
+ ctx.getProvider().getVersion();
}
private void initializeSSLContext(SSLChannelMode preferredChannelMode)
throws NoSuchAlgorithmException, KeyManagementException {
switch (preferredChannelMode) {
case Default:
try {
ctx = SSLContext.getInstance("openssl.TLS");
ctx.init(null, null, null);
channelMode = SSLChannelMode.OpenSSL;
} catch (NoSuchAlgorithmException e) {
LOG.warn("Failed to load OpenSSL. Falling back to the JSSE default.");
ctx = SSLContext.getDefault();
channelMode = SSLChannelMode.Default_JSSE;
}
break;
case OpenSSL:
ctx = SSLContext.getInstance("openssl.TLS");
ctx.init(null, null, null);
channelMode = SSLChannelMode.OpenSSL;
break;
case Default_JSSE:
ctx = SSLContext.getDefault();
channelMode = SSLChannelMode.Default_JSSE;
break;
default:
throw new AssertionError("Unknown channel mode: "
+ preferredChannelMode);
}
}
public String getProviderName() {
return providerName;
}
@Override
public String[] getDefaultCipherSuites() {
return ciphers.clone();
}
@Override
public String[] getSupportedCipherSuites() {
return ciphers.clone();
}
public Socket createSocket() throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory.createSocket();
configureSocket(ss);
return ss;
}
@Override
public Socket createSocket(Socket s, String host, int port,
boolean autoClose) throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory.createSocket(s, host, port, autoClose);
configureSocket(ss);
return ss;
}
@Override
public Socket createSocket(InetAddress address, int port,
InetAddress localAddress, int localPort)
throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory
.createSocket(address, port, localAddress, localPort);
configureSocket(ss);
return ss;
}
@Override
public Socket createSocket(String host, int port, InetAddress localHost,
int localPort) throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory
.createSocket(host, port, localHost, localPort);
configureSocket(ss);
return ss;
}
@Override
public Socket createSocket(InetAddress host, int port) throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory.createSocket(host, port);
configureSocket(ss);
return ss;
}
@Override
public Socket createSocket(String host, int port) throws IOException {
SSLSocketFactory factory = ctx.getSocketFactory();
SSLSocket ss = (SSLSocket) factory.createSocket(host, port);
configureSocket(ss);
return ss;
}
private void configureSocket(SSLSocket ss) throws SocketException {
ss.setEnabledCipherSuites(ciphers);
}
private String[] alterCipherList(String[] defaultCiphers) {
ArrayList<String> preferredSuits = new ArrayList<>();
// Remove GCM mode based ciphers from the supported list.
for (int i = 0; i < defaultCiphers.length; i++) {
if (defaultCiphers[i].contains("_GCM_")) {
LOG.debug("Removed Cipher - " + defaultCiphers[i]);
} else {
preferredSuits.add(defaultCiphers[i]);
}
}
ciphers = preferredSuits.toArray(new String[0]);
return ciphers;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidati
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
@ -41,7 +42,10 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.A
import org.apache.commons.codec.binary.Base64;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import org.junit.Test;
/**
@ -50,11 +54,11 @@ import org.junit.Test;
public class TestAbfsConfigurationFieldsValidation {
private AbfsConfiguration abfsConfiguration;
private static final String INT_KEY= "intKey";
private static final String LONG_KEY= "longKey";
private static final String STRING_KEY= "stringKey";
private static final String BASE64_KEY= "base64Key";
private static final String BOOLEAN_KEY= "booleanKey";
private static final String INT_KEY = "intKey";
private static final String LONG_KEY = "longKey";
private static final String STRING_KEY = "stringKey";
private static final String BASE64_KEY = "base64Key";
private static final String BOOLEAN_KEY = "booleanKey";
private static final int DEFAULT_INT = 4194304;
private static final int DEFAULT_LONG = 4194304;
@ -142,8 +146,26 @@ public class TestAbfsConfigurationFieldsValidation {
assertEquals(this.encodedAccountKey, accountKey);
}
@Test (expected = ConfigurationPropertyNotFoundException.class)
@Test(expected = ConfigurationPropertyNotFoundException.class)
public void testGetAccountKeyWithNonExistingAccountName() throws Exception {
abfsConfiguration.getStorageAccountKey("bogusAccountName");
}
@Test
public void testSSLSocketFactoryConfiguration() throws InvalidConfigurationValueException, IllegalAccessException {
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default, abfsConfiguration.getPreferredSSLFactoryOption());
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, abfsConfiguration.getPreferredSSLFactoryOption());
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, abfsConfiguration.getPreferredSSLFactoryOption());
Configuration configuration = new Configuration();
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.Default_JSSE);
AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration);
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, localAbfsConfiguration.getPreferredSSLFactoryOption());
configuration = new Configuration();
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.OpenSSL);
localAbfsConfiguration = new AbfsConfiguration(configuration);
assertEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
}
}

View File

@ -22,6 +22,7 @@ import java.net.URL;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
import org.junit.Assert;
import org.junit.Test;
@ -34,16 +35,29 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
*/
public final class TestAbfsClient {
private void validateUserAgent(String expectedPattern,
URL baseUrl,
AbfsConfiguration config,
boolean includeSSLProvider) {
AbfsClient client = new AbfsClient(baseUrl, null,
config, null);
String sslProviderName = null;
if (includeSSLProvider) {
sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();
}
String userAgent = client.initializeUserAgent(config, sslProviderName);
Pattern pattern = Pattern.compile(expectedPattern);
Assert.assertTrue(pattern.matcher(userAgent).matches());
}
@Test
public void verifyUnknownUserAgent() throws Exception {
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)";
final Configuration configuration = new Configuration();
configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
Assert.assertTrue(pattern.matcher(userAgent).matches());
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
abfsConfiguration, false);
}
@Test
@ -52,9 +66,19 @@ public final class TestAbfsClient {
final Configuration configuration = new Configuration();
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
Assert.assertTrue(pattern.matcher(userAgent).matches());
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
abfsConfiguration, false);
}
@Test
public void verifyUserAgentWithSSLProvider() throws Exception {
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+) SunJSSE-1.8\\) Partner Service";
final Configuration configuration = new Configuration();
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
configuration.set(ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY,
SSLSocketFactoryEx.SSLChannelMode.Default_JSSE.name());
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
validateUserAgent(expectedUserAgentPattern, new URL("https://azure.com"),
abfsConfiguration, true);
}
}