HADOOP-15669. ABFS: Improve HTTPS Performance.
Contributed by Vishwajeet Dusane.
This commit is contained in:
parent
e320b64367
commit
92e37a0b73
|
@ -1117,6 +1117,13 @@
|
||||||
<version>7.0.0</version>
|
<version>7.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!--Wildfly openssl dependency is introduced by HADOOP-15669-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.wildfly.openssl</groupId>
|
||||||
|
<artifactId>wildfly-openssl</artifactId>
|
||||||
|
<version>1.0.4.Final</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.threadly</groupId>
|
<groupId>org.threadly</groupId>
|
||||||
<artifactId>threadly</artifactId>
|
<artifactId>threadly</artifactId>
|
||||||
|
|
|
@ -184,13 +184,18 @@
|
||||||
<artifactId>jackson-mapper-asl</artifactId>
|
<artifactId>jackson-mapper-asl</artifactId>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.codehaus.jackson</groupId>
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
<artifactId>jackson-core-asl</artifactId>
|
<artifactId>jackson-core-asl</artifactId>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.wildfly.openssl</groupId>
|
||||||
|
<artifactId>wildfly-openssl</artifactId>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- dependencies use for test only -->
|
<!-- dependencies use for test only -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -44,6 +44,10 @@ import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator
|
||||||
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
|
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
|
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
|
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration for Azure Blob FileSystem.
|
* Configuration for Azure Blob FileSystem.
|
||||||
|
@ -270,6 +274,10 @@ public class AbfsConfiguration{
|
||||||
return this.userAgentId;
|
return this.userAgentId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SSLSocketFactoryEx.SSLChannelMode getPreferredSSLFactoryOption() {
|
||||||
|
return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
|
||||||
|
}
|
||||||
|
|
||||||
void validateStorageAccountKeys() throws InvalidConfigurationValueException {
|
void validateStorageAccountKeys() throws InvalidConfigurationValueException {
|
||||||
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
|
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
|
||||||
ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
|
ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
|
||||||
|
|
|
@ -55,6 +55,7 @@ public final class ConfigurationKeys {
|
||||||
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
|
||||||
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
|
||||||
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
|
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
|
||||||
|
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
|
||||||
|
|
||||||
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
|
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
|
||||||
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
|
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs.constants;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responsible to keep all the Azure Blob File System related configurations.
|
* Responsible to keep all the Azure Blob File System related configurations.
|
||||||
|
@ -57,5 +58,8 @@ public final class FileSystemConfigurations {
|
||||||
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
||||||
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
||||||
|
|
||||||
|
public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
|
||||||
|
= SSLSocketFactoryEx.SSLChannelMode.Default;
|
||||||
|
|
||||||
private FileSystemConfigurations() {}
|
private FileSystemConfigurations() {}
|
||||||
}
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs.services;
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -27,6 +28,7 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -35,6 +37,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||||
|
|
||||||
|
@ -60,7 +63,19 @@ public class AbfsClient {
|
||||||
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
||||||
this.abfsConfiguration = abfsConfiguration;
|
this.abfsConfiguration = abfsConfiguration;
|
||||||
this.retryPolicy = exponentialRetryPolicy;
|
this.retryPolicy = exponentialRetryPolicy;
|
||||||
this.userAgent = initializeUserAgent(abfsConfiguration);
|
|
||||||
|
String sslProviderName = null;
|
||||||
|
|
||||||
|
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
|
||||||
|
try {
|
||||||
|
SSLSocketFactoryEx.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Suppress exception. Failure to init SSLSocketFactoryEx would have only performance impact.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getFileSystem() {
|
public String getFileSystem() {
|
||||||
|
@ -395,16 +410,26 @@ public class AbfsClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String initializeUserAgent(final AbfsConfiguration abfsConfiguration) {
|
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
|
||||||
final String userAgentComment = String.format(Locale.ROOT,
|
final String sslProviderName) {
|
||||||
"(JavaJRE %s; %s %s)",
|
StringBuilder sb = new StringBuilder();
|
||||||
System.getProperty(JAVA_VERSION),
|
sb.append("(JavaJRE ");
|
||||||
System.getProperty(OS_NAME)
|
sb.append(System.getProperty(JAVA_VERSION));
|
||||||
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING),
|
sb.append("; ");
|
||||||
System.getProperty(OS_VERSION));
|
sb.append(
|
||||||
|
System.getProperty(OS_NAME).replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
|
||||||
|
sb.append(" ");
|
||||||
|
sb.append(System.getProperty(OS_VERSION));
|
||||||
|
if (sslProviderName != null && !sslProviderName.isEmpty()) {
|
||||||
|
sb.append("; ");
|
||||||
|
sb.append(sslProviderName);
|
||||||
|
}
|
||||||
|
sb.append(")");
|
||||||
|
final String userAgentComment = sb.toString();
|
||||||
String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix();
|
String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix();
|
||||||
if (customUserAgentId != null && !customUserAgentId.isEmpty()) {
|
if (customUserAgentId != null && !customUserAgentId.isEmpty()) {
|
||||||
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId);
|
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s",
|
||||||
|
userAgentComment, customUserAgentId);
|
||||||
}
|
}
|
||||||
return String.format(CLIENT_VERSION + " %s", userAgentComment);
|
return String.format(CLIENT_VERSION + " %s", userAgentComment);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,10 @@ import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import javax.net.ssl.HttpsURLConnection;
|
||||||
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
import org.codehaus.jackson.JsonFactory;
|
import org.codehaus.jackson.JsonFactory;
|
||||||
import org.codehaus.jackson.JsonParser;
|
import org.codehaus.jackson.JsonParser;
|
||||||
import org.codehaus.jackson.JsonToken;
|
import org.codehaus.jackson.JsonToken;
|
||||||
|
@ -174,6 +178,13 @@ public class AbfsHttpOperation {
|
||||||
this.clientRequestId = UUID.randomUUID().toString();
|
this.clientRequestId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
this.connection = openConnection();
|
this.connection = openConnection();
|
||||||
|
if (this.connection instanceof HttpsURLConnection) {
|
||||||
|
HttpsURLConnection secureConn = (HttpsURLConnection) this.connection;
|
||||||
|
SSLSocketFactory sslSocketFactory = SSLSocketFactoryEx.getDefaultFactory();
|
||||||
|
if (sslSocketFactory != null) {
|
||||||
|
secureConn.setSSLSocketFactory(sslSocketFactory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.connection.setConnectTimeout(CONNECT_TIMEOUT);
|
this.connection.setConnectTimeout(CONNECT_TIMEOUT);
|
||||||
this.connection.setReadTimeout(READ_TIMEOUT);
|
this.connection.setReadTimeout(READ_TIMEOUT);
|
||||||
|
|
|
@ -0,0 +1,240 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.utils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.SocketException;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.net.ssl.SSLSocket;
|
||||||
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.wildfly.openssl.OpenSSLProvider;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extension to use native OpenSSL library instead of JSSE for better
|
||||||
|
* performance.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class SSLSocketFactoryEx extends SSLSocketFactory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default indicates Ordered, preferred OpenSSL, if failed to load then fall
|
||||||
|
* back to Default_JSSE
|
||||||
|
*/
|
||||||
|
public enum SSLChannelMode {
|
||||||
|
OpenSSL,
|
||||||
|
Default,
|
||||||
|
Default_JSSE
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SSLSocketFactoryEx instance = null;
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
SSLSocketFactoryEx.class);
|
||||||
|
private String providerName;
|
||||||
|
private SSLContext ctx;
|
||||||
|
private String[] ciphers;
|
||||||
|
private SSLChannelMode channelMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a singleton SSL socket factory.
|
||||||
|
*
|
||||||
|
* @param preferredMode applicable only if the instance is not initialized.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public synchronized static void initializeDefaultFactory(
|
||||||
|
SSLChannelMode preferredMode) throws IOException {
|
||||||
|
if (instance == null) {
|
||||||
|
instance = new SSLSocketFactoryEx(preferredMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Singletone instance of the SSLSocketFactory.
|
||||||
|
*
|
||||||
|
* SSLSocketFactory must be initialized with appropriate SSLChannelMode
|
||||||
|
* using initializeDefaultFactory method.
|
||||||
|
*
|
||||||
|
* @return instance of the SSLSocketFactory, instance must be initialized by
|
||||||
|
* initializeDefaultFactory.
|
||||||
|
*/
|
||||||
|
public static SSLSocketFactoryEx getDefaultFactory() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
OpenSSLProvider.register();
|
||||||
|
}
|
||||||
|
|
||||||
|
private SSLSocketFactoryEx(SSLChannelMode preferredChannelMode)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
initializeSSLContext(preferredChannelMode);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (KeyManagementException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get list of supported cipher suits from the SSL factory.
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
String[] defaultCiphers = factory.getSupportedCipherSuites();
|
||||||
|
String version = System.getProperty("java.version");
|
||||||
|
|
||||||
|
ciphers = (channelMode == SSLChannelMode.Default_JSSE
|
||||||
|
&& version.startsWith("1.8"))
|
||||||
|
? alterCipherList(defaultCiphers) : defaultCiphers;
|
||||||
|
|
||||||
|
providerName = ctx.getProvider().getName() + "-"
|
||||||
|
+ ctx.getProvider().getVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeSSLContext(SSLChannelMode preferredChannelMode)
|
||||||
|
throws NoSuchAlgorithmException, KeyManagementException {
|
||||||
|
switch (preferredChannelMode) {
|
||||||
|
case Default:
|
||||||
|
try {
|
||||||
|
ctx = SSLContext.getInstance("openssl.TLS");
|
||||||
|
ctx.init(null, null, null);
|
||||||
|
channelMode = SSLChannelMode.OpenSSL;
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
LOG.warn("Failed to load OpenSSL. Falling back to the JSSE default.");
|
||||||
|
ctx = SSLContext.getDefault();
|
||||||
|
channelMode = SSLChannelMode.Default_JSSE;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case OpenSSL:
|
||||||
|
ctx = SSLContext.getInstance("openssl.TLS");
|
||||||
|
ctx.init(null, null, null);
|
||||||
|
channelMode = SSLChannelMode.OpenSSL;
|
||||||
|
break;
|
||||||
|
case Default_JSSE:
|
||||||
|
ctx = SSLContext.getDefault();
|
||||||
|
channelMode = SSLChannelMode.Default_JSSE;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new AssertionError("Unknown channel mode: "
|
||||||
|
+ preferredChannelMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProviderName() {
|
||||||
|
return providerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getDefaultCipherSuites() {
|
||||||
|
return ciphers.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getSupportedCipherSuites() {
|
||||||
|
return ciphers.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Socket createSocket() throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory.createSocket();
|
||||||
|
configureSocket(ss);
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Socket createSocket(Socket s, String host, int port,
|
||||||
|
boolean autoClose) throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory.createSocket(s, host, port, autoClose);
|
||||||
|
|
||||||
|
configureSocket(ss);
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Socket createSocket(InetAddress address, int port,
|
||||||
|
InetAddress localAddress, int localPort)
|
||||||
|
throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory
|
||||||
|
.createSocket(address, port, localAddress, localPort);
|
||||||
|
|
||||||
|
configureSocket(ss);
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Socket createSocket(String host, int port, InetAddress localHost,
|
||||||
|
int localPort) throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory
|
||||||
|
.createSocket(host, port, localHost, localPort);
|
||||||
|
|
||||||
|
configureSocket(ss);
|
||||||
|
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Socket createSocket(InetAddress host, int port) throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory.createSocket(host, port);
|
||||||
|
|
||||||
|
configureSocket(ss);
|
||||||
|
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Socket createSocket(String host, int port) throws IOException {
|
||||||
|
SSLSocketFactory factory = ctx.getSocketFactory();
|
||||||
|
SSLSocket ss = (SSLSocket) factory.createSocket(host, port);
|
||||||
|
|
||||||
|
configureSocket(ss);
|
||||||
|
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void configureSocket(SSLSocket ss) throws SocketException {
|
||||||
|
ss.setEnabledCipherSuites(ciphers);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] alterCipherList(String[] defaultCiphers) {
|
||||||
|
|
||||||
|
ArrayList<String> preferredSuits = new ArrayList<>();
|
||||||
|
|
||||||
|
// Remove GCM mode based ciphers from the supported list.
|
||||||
|
for (int i = 0; i < defaultCiphers.length; i++) {
|
||||||
|
if (defaultCiphers[i].contains("_GCM_")) {
|
||||||
|
LOG.debug("Removed Cipher - " + defaultCiphers[i]);
|
||||||
|
} else {
|
||||||
|
preferredSuits.add(defaultCiphers[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ciphers = preferredSuits.toArray(new String[0]);
|
||||||
|
return ciphers;
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidati
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
|
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
|
||||||
|
@ -41,7 +42,10 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.A
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -50,11 +54,11 @@ import org.junit.Test;
|
||||||
public class TestAbfsConfigurationFieldsValidation {
|
public class TestAbfsConfigurationFieldsValidation {
|
||||||
private AbfsConfiguration abfsConfiguration;
|
private AbfsConfiguration abfsConfiguration;
|
||||||
|
|
||||||
private static final String INT_KEY= "intKey";
|
private static final String INT_KEY = "intKey";
|
||||||
private static final String LONG_KEY= "longKey";
|
private static final String LONG_KEY = "longKey";
|
||||||
private static final String STRING_KEY= "stringKey";
|
private static final String STRING_KEY = "stringKey";
|
||||||
private static final String BASE64_KEY= "base64Key";
|
private static final String BASE64_KEY = "base64Key";
|
||||||
private static final String BOOLEAN_KEY= "booleanKey";
|
private static final String BOOLEAN_KEY = "booleanKey";
|
||||||
private static final int DEFAULT_INT = 4194304;
|
private static final int DEFAULT_INT = 4194304;
|
||||||
private static final int DEFAULT_LONG = 4194304;
|
private static final int DEFAULT_LONG = 4194304;
|
||||||
|
|
||||||
|
@ -77,15 +81,15 @@ public class TestAbfsConfigurationFieldsValidation {
|
||||||
private int longField;
|
private int longField;
|
||||||
|
|
||||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = STRING_KEY,
|
@StringConfigurationValidatorAnnotation(ConfigurationKey = STRING_KEY,
|
||||||
DefaultValue = "default")
|
DefaultValue = "default")
|
||||||
private String stringField;
|
private String stringField;
|
||||||
|
|
||||||
@Base64StringConfigurationValidatorAnnotation(ConfigurationKey = BASE64_KEY,
|
@Base64StringConfigurationValidatorAnnotation(ConfigurationKey = BASE64_KEY,
|
||||||
DefaultValue = "base64")
|
DefaultValue = "base64")
|
||||||
private String base64Field;
|
private String base64Field;
|
||||||
|
|
||||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = BOOLEAN_KEY,
|
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = BOOLEAN_KEY,
|
||||||
DefaultValue = false)
|
DefaultValue = false)
|
||||||
private boolean boolField;
|
private boolean boolField;
|
||||||
|
|
||||||
public TestAbfsConfigurationFieldsValidation() throws Exception {
|
public TestAbfsConfigurationFieldsValidation() throws Exception {
|
||||||
|
@ -142,8 +146,26 @@ public class TestAbfsConfigurationFieldsValidation {
|
||||||
assertEquals(this.encodedAccountKey, accountKey);
|
assertEquals(this.encodedAccountKey, accountKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (expected = ConfigurationPropertyNotFoundException.class)
|
@Test(expected = ConfigurationPropertyNotFoundException.class)
|
||||||
public void testGetAccountKeyWithNonExistingAccountName() throws Exception {
|
public void testGetAccountKeyWithNonExistingAccountName() throws Exception {
|
||||||
abfsConfiguration.getStorageAccountKey("bogusAccountName");
|
abfsConfiguration.getStorageAccountKey("bogusAccountName");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testSSLSocketFactoryConfiguration() throws InvalidConfigurationValueException, IllegalAccessException {
|
||||||
|
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.Default_JSSE);
|
||||||
|
AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration);
|
||||||
|
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
|
||||||
|
configuration = new Configuration();
|
||||||
|
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.OpenSSL);
|
||||||
|
localAbfsConfiguration = new AbfsConfiguration(configuration);
|
||||||
|
assertEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import java.net.URL;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -34,16 +35,29 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
*/
|
*/
|
||||||
public final class TestAbfsClient {
|
public final class TestAbfsClient {
|
||||||
|
|
||||||
|
private void validateUserAgent(String expectedPattern,
|
||||||
|
URL baseUrl,
|
||||||
|
AbfsConfiguration config,
|
||||||
|
boolean includeSSLProvider) {
|
||||||
|
AbfsClient client = new AbfsClient(baseUrl, null,
|
||||||
|
config, null);
|
||||||
|
String sslProviderName = null;
|
||||||
|
if (includeSSLProvider) {
|
||||||
|
sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName();
|
||||||
|
}
|
||||||
|
String userAgent = client.initializeUserAgent(config, sslProviderName);
|
||||||
|
Pattern pattern = Pattern.compile(expectedPattern);
|
||||||
|
Assert.assertTrue(pattern.matcher(userAgent).matches());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void verifyUnknownUserAgent() throws Exception {
|
public void verifyUnknownUserAgent() throws Exception {
|
||||||
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)";
|
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)";
|
||||||
final Configuration configuration = new Configuration();
|
final Configuration configuration = new Configuration();
|
||||||
configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
|
configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
|
||||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||||
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
|
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
|
||||||
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
|
abfsConfiguration, false);
|
||||||
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
|
|
||||||
Assert.assertTrue(pattern.matcher(userAgent).matches());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -52,9 +66,19 @@ public final class TestAbfsClient {
|
||||||
final Configuration configuration = new Configuration();
|
final Configuration configuration = new Configuration();
|
||||||
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
|
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
|
||||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||||
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
|
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
|
||||||
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
|
abfsConfiguration, false);
|
||||||
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
|
}
|
||||||
Assert.assertTrue(pattern.matcher(userAgent).matches());
|
|
||||||
|
@Test
|
||||||
|
public void verifyUserAgentWithSSLProvider() throws Exception {
|
||||||
|
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+) SunJSSE-1.8\\) Partner Service";
|
||||||
|
final Configuration configuration = new Configuration();
|
||||||
|
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
|
||||||
|
configuration.set(ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY,
|
||||||
|
SSLSocketFactoryEx.SSLChannelMode.Default_JSSE.name());
|
||||||
|
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||||
|
validateUserAgent(expectedUserAgentPattern, new URL("https://azure.com"),
|
||||||
|
abfsConfiguration, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue