HADOOP-13863. Azure: Add a new SAS key mode for WASB. Contributed by Dushyanth

This commit is contained in:
Mingliang Liu 2016-12-22 17:33:25 -08:00
parent 22befbd585
commit e92a77099b
15 changed files with 1660 additions and 12 deletions

View File

@ -1222,6 +1222,38 @@
<description>The implementation class of the S3A AbstractFileSystem.</description>
</property>
<!-- Azure file system properties -->
<property>
<name>fs.azure.secure.mode</name>
<value>false</value>
<description>
Config flag to identify the mode in which fs.azure.NativeAzureFileSystem needs
to run under. Setting it "true" would make fs.azure.NativeAzureFileSystem use
SAS keys to communicate with Azure storage.
</description>
</property>
<property>
<name>fs.azure.local.sas.key.mode</name>
<value>false</value>
<description>
Works in conjuction with fs.azure.secure.mode. Setting this config to true
results in fs.azure.NativeAzureFileSystem using the local SAS key generation
where the SAS keys are generating in the same process as fs.azure.NativeAzureFileSystem.
If fs.azure.secure.mode flag is set to false, this flag has no effect.
</description>
</property>
<property>
<name>fs.azure.sas.expiry.period</name>
<value>90d</value>
<description>
The default value to be used for expiration period for SAS keys generated.
Can use the following suffix (case insensitive):
ms(millis), s(sec), m(min), h(hour), d(day)
to specify the time (such as 2s, 2m, 1h, etc.).
</description>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
@ -2420,5 +2452,4 @@
in audit logs.
</description>
</property>
</configuration>

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
@ -150,6 +149,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
/**
* Configuration keys to identify if WASB needs to run in Secure mode. In Secure mode
* all interactions with Azure storage is performed using SAS uris. There are two sub modes
* within the Secure mode , one is remote SAS key mode where the SAS keys are generated
* from a remote process and local mode where SAS keys are generated within WASB.
*/
@VisibleForTesting
public static final String KEY_USE_SECURE_MODE = "fs.azure.secure.mode";
/**
* By default the SAS Key mode is expected to run in Romote key mode. This flags sets it
* to run on the local mode.
*/
public static final String KEY_USE_LOCAL_SAS_KEY_MODE = "fs.azure.local.sas.key.mode";
private static final String PERMISSION_METADATA_KEY = "hdi_permission";
private static final String OLD_PERMISSION_METADATA_KEY = "asv_permission";
private static final String IS_FOLDER_METADATA_KEY = "hdi_isfolder";
@ -231,6 +245,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
/**
* Default values to control SAS Key mode.
* By default we set the values to false.
*/
private static final boolean DEFAULT_USE_SECURE_MODE = false;
private static final boolean DEFAULT_USE_LOCAL_SAS_KEY_MODE = false;
/**
* Enable flat listing of blobs as default option. This is useful only if
* listing depth is AZURE_UNBOUNDED_DEPTH.
@ -278,6 +299,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Set if we're running against a storage emulator..
private boolean isStorageEmulator = false;
// Configs controlling WASB SAS key mode.
private boolean useSecureMode = false;
private boolean useLocalSasKeyMode = false;
private String delegationToken;
/**
* A test hook interface that can modify the operation context we use for
* Azure Storage operations, e.g. to inject errors.
@ -410,16 +436,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation)
throws IllegalArgumentException, AzureException, IOException {
if (null == instrumentation) {
throw new IllegalArgumentException("Null instrumentation");
}
this.instrumentation = instrumentation;
if (null == this.storageInteractionLayer) {
this.storageInteractionLayer = new StorageInterfaceImpl();
}
// Check that URI exists.
//
if (null == uri) {
@ -446,6 +468,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
sessionUri = uri;
sessionConfiguration = conf;
useSecureMode = conf.getBoolean(KEY_USE_SECURE_MODE,
DEFAULT_USE_SECURE_MODE);
useLocalSasKeyMode = conf.getBoolean(KEY_USE_LOCAL_SAS_KEY_MODE,
DEFAULT_USE_LOCAL_SAS_KEY_MODE);
if (null == this.storageInteractionLayer) {
if (!useSecureMode) {
this.storageInteractionLayer = new StorageInterfaceImpl();
} else {
this.storageInteractionLayer = new SecureStorageInterfaceImpl(
useLocalSasKeyMode, conf, delegationToken);
}
}
// Start an Azure storage session.
//
createAzureStorageSession();
@ -790,6 +826,31 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
configureAzureStorageSession();
}
/**
* Method to set up the Storage Interaction layer in Secure mode.
* @param accountName - Storage account provided in the initializer
* @param containerName - Container name provided in the initializer
* @param sessionUri - URI provided in the initializer
*/
private void connectToAzureStorageInSecureMode(String accountName,
String containerName, URI sessionUri)
throws AzureException, StorageException, URISyntaxException {
// Assertion: storageInteractionLayer instance has to be a SecureStorageInterfaceImpl
if (!(this.storageInteractionLayer instanceof SecureStorageInterfaceImpl)) {
throw new AssertionError("connectToAzureStorageInSASKeyMode() should be called only"
+ " for SecureStorageInterfaceImpl instances");
}
((SecureStorageInterfaceImpl) this.storageInteractionLayer).
setStorageAccountName(accountName);
container = storageInteractionLayer.getContainerReference(containerName);
rootDirectory = container.getDirectoryReference("");
canCreateOrModifyContainer = true;
}
/**
* Connect to Azure storage using account key credentials.
*/
@ -920,6 +981,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return;
}
// If the securemode flag is set, WASB uses SecureStorageInterfaceImpl instance
// to communicate with Azure storage. In SecureStorageInterfaceImpl SAS keys
// are used to communicate with Azure storage, so connectToAzureStorageInSecureMode
// instantiates the default container using a SAS Key.
if (useSecureMode) {
connectToAzureStorageInSecureMode(accountName, containerName, sessionUri);
return;
}
// Check whether we have a shared access signature for that container.
String propertyValue = sessionConfiguration.get(KEY_ACCOUNT_SAS_PREFIX
+ containerName + "." + accountName);
@ -1330,7 +1400,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
if (blob instanceof CloudPageBlobWrapperImpl){
if (blob instanceof CloudPageBlobWrapper){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.SharedAccessAccountPermissions;
import com.microsoft.azure.storage.SharedAccessAccountPolicy;
import com.microsoft.azure.storage.SharedAccessAccountResourceType;
import com.microsoft.azure.storage.SharedAccessAccountService;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
/***
* Local SAS Key Generation implementation. This class resides in
* the same address space as the WASB driver.
*
* This class gets typically used for testing purposes.
*
*/
public class LocalSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
/**
* Map to cache CloudStorageAccount instances.
*/
private Map<String, CloudStorageAccount> storageAccountMap;
private static final int HOURS_IN_DAY = 24;
public LocalSASKeyGeneratorImpl(Configuration conf) {
super(conf);
storageAccountMap = new HashMap<String, CloudStorageAccount>();
}
/**
* Implementation to generate SAS Key for a container
*/
@Override
public URI getContainerSASUri(String accountName, String container)
throws SASKeyGenerationException {
try {
CloudStorageAccount account =
getSASKeyBasedStorageAccountInstance(accountName);
CloudBlobClient client = account.createCloudBlobClient();
return client.getCredentials().transformUri(
client.getContainerReference(container).getUri());
} catch (StorageException stoEx) {
throw new SASKeyGenerationException("Encountered StorageException while"
+ " generating SAS Key for container " + container + " inside "
+ "storage account " + accountName, stoEx);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException while"
+ " generating SAS Key for container " + container + " inside storage"
+ " account " + accountName, uriSyntaxEx);
}
}
/**
* Helper method that creates a CloudStorageAccount instance based on
* SAS key for accountName
*
* @param accountName Storage Account Name
* @return CloudStorageAccount instance created using SAS key for
* the Storage Account.
* @throws SASKeyGenerationException
*/
private CloudStorageAccount getSASKeyBasedStorageAccountInstance(
String accountName) throws SASKeyGenerationException {
try {
String accountNameWithoutDomain =
getAccountNameWithoutDomain(accountName);
CloudStorageAccount account =
getStorageAccountInstance(accountNameWithoutDomain,
AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
accountName, getConf()));
return new CloudStorageAccount(
new StorageCredentialsSharedAccessSignature(
account.generateSharedAccessSignature(
getDefaultAccountAccessPolicy())), false,
account.getEndpointSuffix(), accountNameWithoutDomain);
} catch (KeyProviderException keyProviderEx) {
throw new SASKeyGenerationException("Encountered KeyProviderException"
+ " while retrieving Storage key from configuration for account "
+ accountName, keyProviderEx);
} catch (InvalidKeyException invalidKeyEx) {
throw new SASKeyGenerationException("Encoutered InvalidKeyException "
+ "while generating Account level SAS key for account" + accountName,
invalidKeyEx);
} catch(StorageException storeEx) {
throw new SASKeyGenerationException("Encoutered StorageException while "
+ "generating Account level SAS key for account" + accountName,
storeEx);
} catch(URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException for"
+ " account " + accountName, uriSyntaxEx);
}
}
/**
* Implementation for generation of Relative Path Blob SAS Uri.
*/
@Override
public URI getRelativeBlobSASUri(String accountName, String container,
String relativePath) throws SASKeyGenerationException {
CloudBlobContainer sc = null;
CloudBlobClient client = null;
try {
CloudStorageAccount account =
getSASKeyBasedStorageAccountInstance(accountName);
client = account.createCloudBlobClient();
sc = client.getContainerReference(container);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "while getting container references for container " + container
+ " inside storage account : " + accountName, uriSyntaxEx);
} catch (StorageException stoEx) {
throw new SASKeyGenerationException("Encountered StorageException while "
+ "getting container references for container " + container
+ " inside storage account : " + accountName, stoEx);
}
CloudBlockBlob blob = null;
try {
blob = sc.getBlockBlobReference(relativePath);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException while "
+ "getting Block Blob references for container " + container
+ " inside storage account : " + accountName, uriSyntaxEx);
} catch (StorageException stoEx) {
throw new SASKeyGenerationException("Encountered StorageException while "
+ "getting Block Blob references for container " + container
+ " inside storage account : " + accountName, stoEx);
}
try {
return client.getCredentials().transformUri(blob.getUri());
} catch (StorageException stoEx) {
throw new SASKeyGenerationException("Encountered StorageException while "
+ "generating SAS key for Blob: " + relativePath + " inside "
+ "container : " + container + " in Storage Account : " + accountName,
stoEx);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "while generating SAS key for Blob: " + relativePath + " inside "
+ "container: " + container + " in Storage Account : " + accountName,
uriSyntaxEx);
}
}
/**
* Helper method that creates CloudStorageAccount Instance using the
* storage account key.
* @param accountName Name of the storage account
* @param accountKey Storage Account key
* @return CloudStorageAccount instance for the storage account.
* @throws SASKeyGenerationException
*/
private CloudStorageAccount getStorageAccountInstance(String accountName,
String accountKey) throws SASKeyGenerationException {
if (!storageAccountMap.containsKey(accountName)) {
CloudStorageAccount account = null;
try {
account =
new CloudStorageAccount(new StorageCredentialsAccountAndKey(
accountName, accountKey));
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "for account " + accountName, uriSyntaxEx);
}
storageAccountMap.put(accountName, account);
}
return storageAccountMap.get(accountName);
}
/**
* Helper method that returns the Storage account name without
* the domain name suffix.
* @param fullAccountName Storage account name with domain name suffix
* @return String
*/
private String getAccountNameWithoutDomain(String fullAccountName) {
StringTokenizer tokenizer = new StringTokenizer(fullAccountName, ".");
return tokenizer.nextToken();
}
/**
* Helper method to generate Access Policy for the Storage Account SAS Key
* @return SharedAccessAccountPolicy
*/
private SharedAccessAccountPolicy getDefaultAccountAccessPolicy() {
SharedAccessAccountPolicy ap =
new SharedAccessAccountPolicy();
Calendar cal = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
cal.setTime(new Date());
cal.add(Calendar.HOUR, (int) getSasKeyExpiryPeriod() * HOURS_IN_DAY);
ap.setSharedAccessExpiryTime(cal.getTime());
ap.setPermissions(getDefaultAccoutSASKeyPermissions());
ap.setResourceTypes(EnumSet.of(SharedAccessAccountResourceType.CONTAINER,
SharedAccessAccountResourceType.OBJECT));
ap.setServices(EnumSet.of(SharedAccessAccountService.BLOB));
return ap;
}
private EnumSet<SharedAccessAccountPermissions> getDefaultAccoutSASKeyPermissions() {
return EnumSet.of(SharedAccessAccountPermissions.ADD,
SharedAccessAccountPermissions.CREATE,
SharedAccessAccountPermissions.DELETE,
SharedAccessAccountPermissions.LIST,
SharedAccessAccountPermissions.READ,
SharedAccessAccountPermissions.UPDATE,
SharedAccessAccountPermissions.WRITE);
}
}

View File

@ -0,0 +1,296 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
/**
* Class implementing a RemoteSASKeyGenerator. This class
* uses the url passed in via the Configuration to make a
* rest call to generate the required SAS Key.
*/
public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
public static final Logger LOG =
LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
/**
* Configuration parameter name expected in the Configuration
* object to provide the url of the remote service {@value}
*/
private static final String KEY_CRED_SERVICE_URL =
"fs.azure.cred.service.url";
/**
* Container SAS Key generation OP name. {@value}
*/
private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
/**
* Relative Blob SAS Key generation OP name. {@value}
*/
private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
/**
* Query parameter specifying the expiry period to be used for sas key
* {@value}
*/
private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
/**
* Query parameter name for the storage account. {@value}
*/
private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
"storage_account";
/**
* Query parameter name for the storage account container. {@value}
*/
private static final String CONTAINER_QUERY_PARAM_NAME =
"container";
/**
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation_token";
/**
* Query parameter name for the relative path inside the storage
* account container. {@value}
*/
private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
"relative_path";
private String delegationToken = "";
private String credServiceUrl = "";
private WasbRemoteCallHelper remoteCallHelper = null;
public RemoteSASKeyGeneratorImpl(Configuration conf) {
super(conf);
}
public boolean initialize(Configuration conf, String delegationToken) {
LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
if (delegationToken == null || delegationToken.isEmpty()) {
LOG.error("Delegation Token not provided for initialization"
+ " of RemoteSASKeyGenerator");
return false;
}
this.delegationToken = delegationToken;
if (credServiceUrl == null || credServiceUrl.isEmpty()) {
LOG.error("CredService Url not found in configuration to initialize"
+ " RemoteSASKeyGenerator");
return false;
}
remoteCallHelper = new WasbRemoteCallHelper();
LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
return true;
}
@Override
public URI getContainerSASUri(String storageAccount, String container)
throws SASKeyGenerationException {
try {
LOG.debug("Generating Container SAS Key for Container {} "
+ "inside Storage Account {} ", container, storageAccount);
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
uriBuilder.setPath("/" + CONTAINER_SAS_OP);
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
container);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
RemoteSASKeyGenerationResponse sasKeyResponse =
makeRemoteRequest(uriBuilder.build());
if (sasKeyResponse == null) {
throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ " object null from remote call");
} else if (sasKeyResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException("Remote Service encountered error"
+ " in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
}
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "while building the HttpGetRequest to remote cred service",
uriSyntaxEx);
}
}
@Override
public URI getRelativeBlobSASUri(String storageAccount, String container,
String relativePath) throws SASKeyGenerationException {
try {
LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
+ " Container {} inside Storage Account {} ",
relativePath, container, storageAccount);
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
uriBuilder.setPath("/" + BLOB_SAS_OP);
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
container);
uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
relativePath);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
RemoteSASKeyGenerationResponse sasKeyResponse =
makeRemoteRequest(uriBuilder.build());
if (sasKeyResponse == null) {
throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ " object null from remote call");
} else if (sasKeyResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException("Remote Service encountered error"
+ " in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
}
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException"
+ " while building the HttpGetRequest to " + " remote service",
uriSyntaxEx);
}
}
/**
* Helper method to make a remote request.
* @param uri - Uri to use for the remote request
* @return RemoteSASKeyGenerationResponse
*/
private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
throws SASKeyGenerationException {
try {
String responseBody =
remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(responseBody,
RemoteSASKeyGenerationResponse.class);
} catch (WasbRemoteCallException remoteCallEx) {
throw new SASKeyGenerationException("Encountered RemoteCallException"
+ " while retrieving SAS key from remote service", remoteCallEx);
} catch (JsonParseException jsonParserEx) {
throw new SASKeyGenerationException("Encountered JsonParseException "
+ "while parsing the response from remote"
+ " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
} catch (JsonMappingException jsonMappingEx) {
throw new SASKeyGenerationException("Encountered JsonMappingException"
+ " while mapping the response from remote service into "
+ "RemoteSASKeyGenerationResponse object", jsonMappingEx);
} catch (IOException ioEx) {
throw new SASKeyGenerationException("Encountered IOException while "
+ "accessing remote service to retrieve SAS Key", ioEx);
}
}
}
/**
* POJO representing the response expected from a Remote
* SAS Key generation service.
* The remote SAS Key generation service is expected to
* return SAS key in json format:
* {
* "responseCode" : 0 or non-zero <int>,
* "responseMessage" : relavant message on failure <String>,
* "sasKey" : Requested SAS Key <String>
* }
*/
class RemoteSASKeyGenerationResponse {
/**
* Response code for the call.
*/
private int responseCode;
/**
* An intelligent message corresponding to
* result. Specifically in case of failure
* the reason for failure.
*/
private String responseMessage;
/**
* SAS Key corresponding to the request.
*/
private String sasKey;
public int getResponseCode() {
return responseCode;
}
public void setResponseCode(int responseCode) {
this.responseCode = responseCode;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
public String getSasKey() {
return sasKey;
}
public void setSasKey(String sasKey) {
this.sasKey = sasKey;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
/**
* Exception that gets thrown during generation of SAS Key.
*
*/
public class SASKeyGenerationException extends AzureException {
private static final long serialVersionUID = 1L;
public SASKeyGenerationException(String message) {
super(message);
}
public SASKeyGenerationException(String message, Throwable cause) {
super(message, cause);
}
public SASKeyGenerationException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
/**
* Abstract base class for the SAS Key Generator implementation
*
*/
public abstract class SASKeyGeneratorImpl implements SASKeyGeneratorInterface {
/**
* Configuration key to be used to specify the expiry period for SAS keys
* This value currently is specified in days. {@value}
*/
public static final String KEY_SAS_KEY_EXPIRY_PERIOD =
"fs.azure.sas.expiry.period";
/**
* Default value for the SAS key expiry period in days. {@value}
*/
public static final long DEFAUL_CONTAINER_SAS_KEY_PERIOD = 90;
private long sasKeyExpiryPeriod;
private Configuration conf;
public SASKeyGeneratorImpl(Configuration conf) {
this.conf = conf;
this.sasKeyExpiryPeriod = conf.getTimeDuration(
KEY_SAS_KEY_EXPIRY_PERIOD, DEFAUL_CONTAINER_SAS_KEY_PERIOD,
TimeUnit.DAYS);
}
public long getSasKeyExpiryPeriod() {
return sasKeyExpiryPeriod;
}
public Configuration getConf() {
return conf;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.net.URI;
/**
* Iterface used by AzureNativeFileSysteStore to retrieve SAS Keys for the
* respective azure storage entity. This interface is expected to be
* implemented in two modes:
* 1) Local Mode: In this mode SAS Keys are generated
* in same address space as the WASB. This will be primarily used for
* testing purposes.
* 2) Remote Mode: In this mode SAS Keys are generated in a sepearte process
* other than WASB and will be communicated via client.
*/
public interface SASKeyGeneratorInterface {
/**
* Interface method to retrieve SAS Key for a container within the storage
* account.
*
* @param accountName
* - Storage account name
* @param container
* - Container name within the storage account.
* @return SAS URI for the container.
* @throws SASKeyGenerationException
*/
URI getContainerSASUri(String accountName, String container)
throws SASKeyGenerationException;
/**
* Interface method to retrieve SAS Key for a blob within the container of the
* storage account.
*
* @param accountName
* - Storage account name
* @param container
* - Container name within the storage account.
* @param relativePath
* - Relative path within the container
* @return SAS URI for the relative path blob.
* @throws SASKeyGenerationException
*/
URI getRelativeBlobSASUri(String accountName, String container,
String relativePath) throws SASKeyGenerationException;
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
/**
* Exception that is thrown when any error is encountered
* is SAS Mode operation of WASB.
*/
public class SecureModeException extends AzureException {
private static final long serialVersionUID = 1L;
public SecureModeException(String message) {
super(message);
}
public SecureModeException(String message, Throwable cause) {
super(message, cause);
}
public SecureModeException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,565 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import com.microsoft.azure.storage.blob.CopyState;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.PageRange;
import com.microsoft.azure.storage.blob.BlockEntry;
import org.apache.hadoop.classification.InterfaceAudience;
/***
* An implementation of the StorageInterface for SAS Key mode.
*
*/
public class SecureStorageInterfaceImpl extends StorageInterface {
public static final Logger LOG = LoggerFactory.getLogger(
SecureStorageInterfaceImpl.class);
public static final String SAS_ERROR_CODE = "SAS Error";
private SASKeyGeneratorInterface sasKeyGenerator;
private String storageAccount;
private String delegationToken;
public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
Configuration conf, String delegationToken)
throws SecureModeException {
this.delegationToken = delegationToken;
if (useLocalSASKeyMode) {
this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
} else {
RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
new RemoteSASKeyGeneratorImpl(conf);
if (!remoteSasKeyGenerator.initialize(conf, this.delegationToken)) {
throw new SecureModeException("Remote SAS Key mode could"
+ " not be initialized");
}
this.sasKeyGenerator = remoteSasKeyGenerator;
}
}
@Override
public void setTimeoutInMs(int timeoutInMs) {
}
@Override
public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
}
@Override
public void createBlobClient(CloudStorageAccount account) {
String errorMsg = "createBlobClient is an invalid operation in"
+ " SAS Key Mode";
LOG.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
@Override
public void createBlobClient(URI baseUri) {
String errorMsg = "createBlobClient is an invalid operation in "
+ "SAS Key Mode";
LOG.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
@Override
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
String errorMsg = "createBlobClient is an invalid operation in SAS "
+ "Key Mode";
LOG.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
@Override
public StorageCredentials getCredentials() {
String errorMsg = "getCredentials is an invalid operation in SAS "
+ "Key Mode";
LOG.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
@Override
public CloudBlobContainerWrapper getContainerReference(String name)
throws URISyntaxException, StorageException {
try {
return new SASCloudBlobContainerWrapperImpl(storageAccount,
new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
storageAccount, name)), sasKeyGenerator);
} catch (SASKeyGenerationException sasEx) {
String errorMsg = "Encountered SASKeyGeneration exception while "
+ "generating SAS Key for container : " + name
+ " inside Storage account : " + storageAccount;
LOG.error(errorMsg);
throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
}
}
public void setStorageAccountName(String storageAccount) {
this.storageAccount = storageAccount;
}
@InterfaceAudience.Private
static class SASCloudBlobContainerWrapperImpl
extends CloudBlobContainerWrapper {
private final CloudBlobContainer container;
private String storageAccount;
private SASKeyGeneratorInterface sasKeyGenerator;
public SASCloudBlobContainerWrapperImpl(String storageAccount,
CloudBlobContainer container, SASKeyGeneratorInterface sasKeyGenerator) {
this.storageAccount = storageAccount;
this.container = container;
this.sasKeyGenerator = sasKeyGenerator;
}
@Override
public String getName() {
return container.getName();
}
@Override
public boolean exists(OperationContext opContext) throws StorageException {
return container.exists(AccessCondition.generateEmptyCondition(), null,
opContext);
}
@Override
public void create(OperationContext opContext) throws StorageException {
container.create(null, opContext);
}
@Override
public HashMap<String, String> getMetadata() {
return container.getMetadata();
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
container.setMetadata(metadata);
}
@Override
public void downloadAttributes(OperationContext opContext)
throws StorageException {
container.downloadAttributes(AccessCondition.generateEmptyCondition(),
null, opContext);
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
container.uploadMetadata(AccessCondition.generateEmptyCondition(), null,
opContext);
}
@Override
public CloudBlobDirectoryWrapper getDirectoryReference(String relativePath)
throws URISyntaxException, StorageException {
CloudBlobDirectory dir = container.getDirectoryReference(relativePath);
return new SASCloudBlobDirectoryWrapperImpl(dir);
}
@Override
public CloudBlobWrapper getBlockBlobReference(String relativePath)
throws URISyntaxException, StorageException {
try {
return new SASCloudBlockBlobWrapperImpl(
new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
storageAccount, getName(), relativePath)));
} catch (SASKeyGenerationException sasEx) {
String errorMsg = "Encountered SASKeyGeneration exception while "
+ "generating SAS Key for relativePath : " + relativePath
+ " inside container : " + getName() + " Storage account : " + storageAccount;
LOG.error(errorMsg);
throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
}
}
@Override
public CloudBlobWrapper getPageBlobReference(String relativePath)
throws URISyntaxException, StorageException {
try {
return new SASCloudPageBlobWrapperImpl(
new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
storageAccount, getName(), relativePath)));
} catch (SASKeyGenerationException sasEx) {
String errorMsg = "Encountered SASKeyGeneration exception while "
+ "generating SAS Key for relativePath : " + relativePath
+ " inside container : " + getName()
+ " Storage account : " + storageAccount;
LOG.error(errorMsg);
throw new StorageException(SAS_ERROR_CODE, errorMsg, sasEx);
}
}
}
//
// WrappingIterator
//
/**
* This iterator wraps every ListBlobItem as they come from the listBlobs()
* calls to their proper wrapping objects.
*/
private static class SASWrappingIterator implements Iterator<ListBlobItem> {
private final Iterator<ListBlobItem> present;
public SASWrappingIterator(Iterator<ListBlobItem> present) {
this.present = present;
}
public static Iterable<ListBlobItem> wrap(
final Iterable<ListBlobItem> present) {
return new Iterable<ListBlobItem>() {
@Override
public Iterator<ListBlobItem> iterator() {
return new SASWrappingIterator(present.iterator());
}
};
}
@Override
public boolean hasNext() {
return present.hasNext();
}
@Override
public ListBlobItem next() {
ListBlobItem unwrapped = present.next();
if (unwrapped instanceof CloudBlobDirectory) {
return new SASCloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
} else if (unwrapped instanceof CloudBlockBlob) {
return new SASCloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
} else if (unwrapped instanceof CloudPageBlob) {
return new SASCloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
} else {
return unwrapped;
}
}
@Override
public void remove() {
present.remove();
}
}
//
// CloudBlobDirectoryWrapperImpl
//
@InterfaceAudience.Private
static class SASCloudBlobDirectoryWrapperImpl extends CloudBlobDirectoryWrapper {
private final CloudBlobDirectory directory;
public SASCloudBlobDirectoryWrapperImpl(CloudBlobDirectory directory) {
this.directory = directory;
}
@Override
public URI getUri() {
return directory.getUri();
}
@Override
public Iterable<ListBlobItem> listBlobs(String prefix,
boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
BlobRequestOptions options, OperationContext opContext)
throws URISyntaxException, StorageException {
return SASWrappingIterator.wrap(directory.listBlobs(prefix,
useFlatBlobListing, listingDetails, options, opContext));
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return directory.getContainer();
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return directory.getParent();
}
@Override
public StorageUri getStorageUri() {
return directory.getStorageUri();
}
}
abstract static class SASCloudBlobWrapperImpl implements CloudBlobWrapper {
private final CloudBlob blob;
@Override
public CloudBlob getBlob() {
return blob;
}
public URI getUri() {
return getBlob().getUri();
}
protected SASCloudBlobWrapperImpl(CloudBlob blob) {
this.blob = blob;
}
@Override
public HashMap<String, String> getMetadata() {
return getBlob().getMetadata();
}
@Override
public void delete(OperationContext opContext, SelfRenewingLease lease)
throws StorageException {
getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
null, opContext);
}
/**
* Return and access condition for this lease, or else null if
* there's no lease.
*/
private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
AccessCondition leaseCondition = null;
if (lease != null) {
leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
}
return leaseCondition;
}
@Override
public boolean exists(OperationContext opContext)
throws StorageException {
return getBlob().exists(null, null, opContext);
}
@Override
public void downloadAttributes(
OperationContext opContext) throws StorageException {
getBlob().downloadAttributes(null, null, opContext);
}
@Override
public BlobProperties getProperties() {
return getBlob().getProperties();
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
getBlob().setMetadata(metadata);
}
@Override
public InputStream openInputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return getBlob().openInputStream(null, options, opContext);
}
public OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
}
public void upload(InputStream sourceStream, OperationContext opContext)
throws StorageException, IOException {
getBlob().upload(sourceStream, 0, null, null, opContext);
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
return getBlob().getContainer();
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
return getBlob().getParent();
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
uploadMetadata(null, null, opContext);
}
@Override
public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
OperationContext opContext) throws StorageException{
getBlob().uploadMetadata(accessConditions, options, opContext);
}
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
throws StorageException {
// Include lease in request if lease not null.
getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
}
@Override
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
}
@Override
public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
}
@Override
public StorageUri getStorageUri() {
return getBlob().getStorageUri();
}
@Override
public CopyState getCopyState() {
return getBlob().getCopyState();
}
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
throws StorageException, URISyntaxException {
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext);
}
@Override
public void downloadRange(long offset, long length, OutputStream outStream,
BlobRequestOptions options, OperationContext opContext)
throws StorageException, IOException {
getBlob().downloadRange(offset, length, outStream, null, options, opContext);
}
@Override
public SelfRenewingLease acquireLease() throws StorageException {
return new SelfRenewingLease(this);
}
}
//
// CloudBlockBlobWrapperImpl
//
static class SASCloudBlockBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudBlockBlobWrapper {
public SASCloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
super(blob);
}
public OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
}
public void upload(InputStream sourceStream, OperationContext opContext)
throws StorageException, IOException {
getBlob().upload(sourceStream, 0, null, null, opContext);
}
public void uploadProperties(OperationContext opContext)
throws StorageException {
getBlob().uploadProperties(null, null, opContext);
}
@Override
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
}
@Override
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
}
}
static class SASCloudPageBlobWrapperImpl extends SASCloudBlobWrapperImpl implements CloudPageBlobWrapper {
public SASCloudPageBlobWrapperImpl(CloudPageBlob blob) {
super(blob);
}
public void create(final long length, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
((CloudPageBlob) getBlob()).create(length, null, options, opContext);
}
public void uploadPages(final InputStream sourceStream, final long offset,
final long length, BlobRequestOptions options, OperationContext opContext)
throws StorageException, IOException {
((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
options, opContext);
}
public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
OperationContext opContext) throws StorageException {
return ((CloudPageBlob) getBlob()).downloadPageRanges(
null, options, opContext);
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
/**
* Exception that gets thrown when a remote call
* made from WASB to external cred service fails.
*/
public class WasbRemoteCallException extends AzureException {
private static final long serialVersionUID = 1L;
public WasbRemoteCallException(String message) {
super(message);
}
public WasbRemoteCallException(String message, Throwable cause) {
super(message, cause);
}
public WasbRemoteCallException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
/**
* Helper class the has constants and helper methods
* used in WASB when integrating with a remote http cred
* service. Currently, remote service will be used to generate
* SAS keys.
*/
class WasbRemoteCallHelper {
/**
* Return code when the remote call is successful. {@value}
*/
public static final int REMOTE_CALL_SUCCESS_CODE = 0;
/**
* Client instance to be used for making the remote call.
*/
private HttpClient client = null;
public WasbRemoteCallHelper() {
this.client = HttpClientBuilder.create().build();
}
/**
* Helper method to make remote HTTP Get request.
* @param getRequest - HttpGet request object constructed by caller.
* @return Http Response body returned as a string. The caller
* is expected to semantically understand the response.
* @throws WasbRemoteCallException
*/
public String makeRemoteGetRequest(HttpGet getRequest)
throws WasbRemoteCallException {
try {
HttpResponse response = client.execute(getRequest);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new WasbRemoteCallException(
response.getStatusLine().toString());
}
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(),
StandardCharsets.UTF_8));
StringBuilder responseBody = new StringBuilder();
String responseLine = "";
while ((responseLine = rd.readLine()) != null) {
responseBody.append(responseLine);
}
rd.close();
return responseBody.toString();
} catch (ClientProtocolException clientProtocolEx) {
throw new WasbRemoteCallException("Encountered ClientProtocolException"
+ " while making remote call", clientProtocolEx);
} catch (IOException ioEx) {
throw new WasbRemoteCallException("Encountered IOException while making"
+ " remote call", ioEx);
}
}
}

View File

@ -285,6 +285,54 @@ To enable 20 threads for Rename operation. Set configuration value to 0 or 1 to
<value>20</value>
</property>
### <a name="WASB_SECURE_MODE" />WASB Secure mode and configuration
WASB can operate in secure mode where the Storage access keys required to communicate with Azure storage does not have to
be in the same address space as the process using WASB. In this mode all interactions with Azure storage is performed using
SAS uris. There are two sub modes within the Secure mode, one is remote SAS key mode where the SAS keys are generated from
a remote process and local mode where SAS keys are generated within WASB. By default the SAS Key mode is expected to run in
Romote mode, however for testing purposes the local mode can be enabled to generate SAS keys in the same process as WASB.
To enable Secure mode following property needs to be set to true.
```
<property>
<name>fs.azure.secure.mode</name>
<value>true</value>
</property>
```
To enable SAS key generation locally following property needs to be set to true.
```
<property>
<name>fs.azure.local.sas.key.mode</name>
<value>true</value>
</property>
```
To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
Following property can used to provide the end point to use for remote SAS Key generation:
```
<property>
<name>fs.azure.cred.service.url</name>
<value>{URL}</value>
</property>
```
The remote service is expected to provide support for two REST calls ```{URL}/GET_CONTAINER_SAS``` and ```{URL}/GET_RELATIVE_BLOB_SAS```, for generating
container and relative blob sas keys. An example requests
```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
```{URL}/GET_CONTAINER_SAS?storage_account=<account_name>&container=<container>&relative_path=<relative path>&sas_expiry=<expiry period>&delegation_token=<delegation token>```
The service is expected to return a response in JSON format:
```
{
"responseCode" : 0 or non-zero <int>,
"responseMessage" : relavant message on failure <String>,
"sasKey" : Requested SAS Key <String>
}
```
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import com.microsoft.azure.storage.blob.BlobOutputStream;
@ -41,7 +43,7 @@ import com.microsoft.azure.storage.blob.CloudBlockBlob;
*/
public class TestContainerChecks {
private AzureBlobStorageTestAccount testAccount;
private boolean runningInSASMode = false;
@After
public void tearDown() throws Exception {
if (testAccount != null) {
@ -50,6 +52,12 @@ public class TestContainerChecks {
}
}
@Before
public void setMode() {
runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
}
@Test
public void testContainerExistAfterDoesNotExist() throws Exception {
testAccount = AzureBlobStorageTestAccount.create("",
@ -155,6 +163,8 @@ public class TestContainerChecks {
@Test
public void testContainerChecksWithSas() throws Exception {
Assume.assumeFalse(runningInSASMode);
testAccount = AzureBlobStorageTestAccount.create("",
EnumSet.of(CreateOptions.UseSas));
assumeNotNull(testAccount);

View File

@ -39,7 +39,6 @@ import java.io.File;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
@ -48,6 +47,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -63,7 +64,7 @@ public class TestWasbUriAndConfiguration {
protected String accountName;
protected String accountKey;
protected static Configuration conf = null;
private boolean runningInSASMode = false;
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
@ -77,6 +78,12 @@ public class TestWasbUriAndConfiguration {
}
}
@Before
public void setMode() {
runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
}
private boolean validateIOStreams(Path filePath) throws IOException {
// Capture the file system from the test account.
FileSystem fs = testAccount.getFileSystem();
@ -128,6 +135,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testConnectUsingSAS() throws Exception {
Assume.assumeFalse(runningInSASMode);
// Create the test account with SAS credentials.
testAccount = AzureBlobStorageTestAccount.create("",
EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer));
@ -142,6 +151,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testConnectUsingSASReadonly() throws Exception {
Assume.assumeFalse(runningInSASMode);
// Create the test account with SAS credentials.
testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of(
CreateOptions.UseSas, CreateOptions.CreateContainer,
@ -318,6 +329,8 @@ public class TestWasbUriAndConfiguration {
@Test
public void testCredsFromCredentialProvider() throws Exception {
Assume.assumeFalse(runningInSASMode);
String account = "testacct";
String key = "testkey";
// set up conf to have a cred provider

View File

@ -17,15 +17,28 @@
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<!-- For tests against live azure, provide the following account information -->
<!--
<property>
<name>fs.azure.test.account.name</name>
<value>{ACCOUNTNAME}.blob.core.windows.net</value>
<value>{ACCOUNTNAME}.blob.core.windows.net</value>
</property>
<property>
<name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
<value>{ACCOUNTKEY}</value>
</property>
<property>
<name>fs.azure.secure.mode</name>
<value>false</value>
</property>
<property>
<name>fs.azure.local.sas.key.mode</name>
<value>false</value>
</property>
<property>
<name>fs.azure.cred.service.url</name>
<value>{CRED_SERIVCE_URL}</value>
</property>
-->
<!-- Save the above configuration properties in a separate file named -->