From b54b0c1b676c616aef9574e4e88ea30c314c79dc Mon Sep 17 00:00:00 2001 From: Thomas Marquardt Date: Sat, 11 Aug 2018 00:10:26 +0000 Subject: [PATCH] HADOOP-15659. Code changes for bug fix and new tests. Contributed by Da Zhou. --- hadoop-tools/hadoop-azure/pom.xml | 30 ++-- .../{services => }/AbfsConfiguration.java | 69 ++++++++- .../fs/azurebfs/AzureBlobFileSystem.java | 55 +++++-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 39 ++--- .../azurebfs/constants/ConfigurationKeys.java | 6 + .../constants/FileSystemConfigurations.java | 4 +- .../exceptions/KeyProviderException.java | 42 ++++++ .../services/AzureServiceErrorCode.java | 1 + .../services/ListResultEntrySchema.java | 2 +- .../contracts/services/ListResultSchema.java | 2 +- .../fs/azurebfs/services/AbfsClient.java | 26 ++-- .../azurebfs/services/AbfsHttpOperation.java | 19 +-- .../fs/azurebfs/services/AbfsInputStream.java | 2 +- .../azurebfs/services/AbfsOutputStream.java | 25 ++-- .../azurebfs/services/AbfsRestOperation.java | 2 +- .../services/AbfsUriQueryBuilder.java | 8 +- .../fs/azurebfs/services/KeyProvider.java | 42 ++++++ .../services/ShellDecryptionKeyProvider.java | 63 ++++++++ .../azurebfs/services/SimpleKeyProvider.java | 54 +++++++ .../azurebfs/AbstractAbfsIntegrationTest.java | 17 ++- .../hadoop/fs/azurebfs/ITestAbfsClient.java | 45 ++++++ .../ITestAbfsReadWriteAndSeek.java | 8 +- .../azurebfs/ITestAzureBlobFileSystemE2E.java | 2 +- .../ITestAzureBlobFileSystemE2EScale.java | 4 +- .../ITestAzureBlobFileSystemFinalize.java | 60 ++++++++ .../ITestAzureBlobFileSystemFlush.java | 136 +++++++++++++++++- ...ITestAzureBlobFileSystemInitAndCreate.java | 4 +- .../ITestAzureBlobFileSystemRename.java | 3 +- .../azurebfs/ITestFileSystemProperties.java | 4 - ...TestAbfsConfigurationFieldsValidation.java | 8 +- .../contract/AbfsFileSystemContract.java | 5 +- .../fs/azurebfs/services/TestAbfsClient.java | 60 ++++++++ .../TestShellDecryptionKeyProvider.java | 89 ++++++++++++ 33 files changed, 824 insertions(+), 112 deletions(-) rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/{services => }/AbfsConfiguration.java (83%) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/{services => }/ITestAbfsReadWriteAndSeek.java (90%) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/{services => }/TestAbfsConfigurationFieldsValidation.java (97%) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestShellDecryptionKeyProvider.java diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index cbd4dfb5b4b..7d0406c6cf8 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -149,17 +149,6 @@ provided - - com.fasterxml.jackson.core - jackson-core - compile - - - - com.fasterxml.jackson.core - jackson-databind - compile - org.apache.httpcomponents @@ -197,18 +186,25 @@ guava - - joda-time - joda-time - compile - - org.eclipse.jetty jetty-util-ajax compile + + org.codehaus.jackson + jackson-mapper-asl + compile + + + org.codehaus.jackson + jackson-core-asl + compile + + + + junit diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java similarity index 83% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 8def1bba20a..1fb5df9aa39 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs.services; +package org.apache.hadoop.fs.azurebfs; import java.lang.reflect.Field; import java.util.Map; @@ -33,13 +33,17 @@ import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidati import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.services.KeyProvider; +import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider; /** * Configuration for Azure Blob FileSystem. @@ -111,10 +115,23 @@ public class AbfsConfiguration{ DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, + DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION) + private boolean skipUserGroupMetadataDuringInitialization; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) private int readAheadQueueDepth; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH, + DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH) + private boolean enableFlush; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, + DefaultValue = "") + private String userAgentId; + private Map storageAccountKeys; public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { @@ -147,13 +164,38 @@ public class AbfsConfiguration{ return this.isSecure; } - public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { - String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); - if (accountKey == null) { + public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException { + String key; + String keyProviderClass = + configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); + KeyProvider keyProvider; + + if (keyProviderClass == null) { + // No key provider was provided so use the provided key as is. + keyProvider = new SimpleKeyProvider(); + } else { + // create an instance of the key provider class and verify it + // implements KeyProvider + Object keyProviderObject; + try { + Class clazz = configuration.getClassByName(keyProviderClass); + keyProviderObject = clazz.newInstance(); + } catch (Exception e) { + throw new KeyProviderException("Unable to load key provider class.", e); + } + if (!(keyProviderObject instanceof KeyProvider)) { + throw new KeyProviderException(keyProviderClass + + " specified in config is not a valid KeyProvider class."); + } + keyProvider = (KeyProvider) keyProviderObject; + } + key = keyProvider.getStorageAccountKey(accountName, configuration); + + if (key == null) { throw new ConfigurationPropertyNotFoundException(accountName); } - return accountKey; + return key; } public Configuration getConfiguration() { @@ -212,10 +254,22 @@ public class AbfsConfiguration{ return this.createRemoteFileSystemDuringInitialization; } + public boolean getSkipUserGroupMetadataDuringInitialization() { + return this.skipUserGroupMetadataDuringInitialization; + } + public int getReadAheadQueueDepth() { return this.readAheadQueueDepth; } + public boolean isFlushEnabled() { + return this.enableFlush; + } + + public String getCustomUserAgentPrefix() { + return this.userAgentId; + } + void validateStorageAccountKeys() throws InvalidConfigurationValueException { Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); @@ -294,4 +348,9 @@ public class AbfsConfiguration{ void setWriteBufferSize(int bufferSize) { this.writeBufferSize = bufferSize; } + + @VisibleForTesting + void setEnableFlush(boolean enableFlush) { + this.enableFlush = enableFlush; + } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9f58f6b040a..b0a30a0b55c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,6 @@ public class AzureBlobFileSystem extends FileSystem { this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.userGroupInformation = UserGroupInformation.getCurrentUser(); this.user = userGroupInformation.getUserName(); - this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation); LOG.debug("Initializing NativeAzureFileSystem for {}", uri); @@ -98,7 +98,16 @@ public class AzureBlobFileSystem extends FileSystem { this.setWorkingDirectory(this.getHomeDirectory()); if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) { - this.createFileSystem(); + if (!this.fileSystemExists()) { + this.createFileSystem(); + } + } + + if (!abfsStore.getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) { + this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); + } else { + //Provide a default group name + this.primaryUserGroup = this.user; } } @@ -375,7 +384,7 @@ public class AzureBlobFileSystem extends FileSystem { if (file.getLen() < start) { return new BlockLocation[0]; } - final String blobLocationHost = this.abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); + final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); final String[] name = { blobLocationHost }; final String[] host = { blobLocationHost }; @@ -397,6 +406,13 @@ public class AzureBlobFileSystem extends FileSystem { return locations; } + @Override + protected void finalize() throws Throwable { + LOG.debug("finalize() called."); + close(); + super.finalize(); + } + public String getOwnerUser() { return user; } @@ -450,13 +466,31 @@ public class AzureBlobFileSystem extends FileSystem { } } + private boolean fileSystemExists() throws IOException { + LOG.debug( + "AzureBlobFileSystem.fileSystemExists uri: {}", uri); + try { + abfsStore.getFilesystemProperties(); + } catch (AzureBlobFileSystemException ex) { + try { + checkException(null, ex); + // Because HEAD request won't contain message body, + // there is not way to get the storage error code + // workaround here is to check its status code. + } catch (FileNotFoundException e) { + return false; + } + } + return true; + } + private void createFileSystem() throws IOException { LOG.debug( "AzureBlobFileSystem.createFileSystem uri: {}", uri); try { - this.abfsStore.createFilesystem(); + abfsStore.createFilesystem(); } catch (AzureBlobFileSystemException ex) { - checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); + checkException(null, ex); } } @@ -556,10 +590,10 @@ public class AzureBlobFileSystem extends FileSystem { //AbfsRestOperationException.getMessage() contains full error info including path/uri. if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) { - throw (IOException)new FileNotFoundException(ere.getMessage()) + throw (IOException) new FileNotFoundException(ere.getMessage()) .initCause(exception); } else if (statusCode == HttpURLConnection.HTTP_CONFLICT) { - throw (IOException)new FileAlreadyExistsException(ere.getMessage()) + throw (IOException) new FileAlreadyExistsException(ere.getMessage()) .initCause(exception); } else { throw ere; @@ -615,6 +649,11 @@ public class AzureBlobFileSystem extends FileSystem { @VisibleForTesting AzureBlobFileSystemStore getAbfsStore() { - return this.abfsStore; + return abfsStore; + } + + @VisibleForTesting + AbfsClient getAbfsClient() { + return abfsStore.getClient(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 8ac31ce0372..ba721497d74 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -31,8 +31,11 @@ import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.Hashtable; import java.util.Map; @@ -65,7 +68,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; -import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -75,8 +77,6 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -223,7 +223,7 @@ public class AzureBlobFileSystemStore { final OutputStream outputStream; outputStream = new FSDataOutputStream( new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize()), null); + abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); return outputStream; } @@ -287,7 +287,7 @@ public class AzureBlobFileSystemStore { final OutputStream outputStream; outputStream = new FSDataOutputStream( new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, abfsConfiguration.getWriteBufferSize()), null); + offset, abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); return outputStream; } @@ -366,7 +366,7 @@ public class AzureBlobFileSystemStore { true, 1, blockSize, - parseLastModifiedTime(lastModified).getMillis(), + parseLastModifiedTime(lastModified), path, eTag); } else { @@ -385,7 +385,7 @@ public class AzureBlobFileSystemStore { parseIsDirectory(resourceType), 1, blockSize, - parseLastModifiedTime(lastModified).getMillis(), + parseLastModifiedTime(lastModified), path, eTag); } @@ -419,10 +419,7 @@ public class AzureBlobFileSystemStore { long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { - final DateTime dateTime = DateTime.parse( - entry.lastModified(), - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); - lastModifiedMillis = dateTime.getMillis(); + lastModifiedMillis = parseLastModifiedTime(entry.lastModified()); } Path entryPath = new Path(File.separator + entry.name()); @@ -534,10 +531,16 @@ public class AzureBlobFileSystemStore { && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); } - private DateTime parseLastModifiedTime(final String lastModifiedTime) { - return DateTime.parse( - lastModifiedTime, - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + private long parseLastModifiedTime(final String lastModifiedTime) { + long parsedTime = 0; + try { + Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime); + parsedTime = utcDate.getTime(); + } catch (ParseException e) { + LOG.error("Failed to parse the date {0}", lastModifiedTime); + } finally { + return parsedTime; + } } private String convertXmsPropertiesToCommaSeparatedString(final Hashtable properties) throws @@ -663,7 +666,7 @@ public class AzureBlobFileSystemStore { } if (other instanceof VersionedFileStatus) { - return this.version.equals(((VersionedFileStatus)other).version); + return this.version.equals(((VersionedFileStatus) other).version); } return true; @@ -702,5 +705,9 @@ public class AzureBlobFileSystemStore { } } + @VisibleForTesting + AbfsClient getClient() { + return this.client; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index ead1003982b..9c805a2a7c2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -49,9 +49,15 @@ public final class ConfigurationKeys { public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; + public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; + public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; + public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; + + public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider."; + public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 482158c1d73..1655d040493 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -39,7 +39,7 @@ public final class FileSystemConfigurations { private static final int ONE_MB = ONE_KB * ONE_KB; // Default upload and download buffer size - public static final int DEFAULT_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB @@ -50,10 +50,12 @@ public final class FileSystemConfigurations { public static final int MAX_CONCURRENT_WRITE_THREADS = 8; public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false; + public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; + public static final boolean DEFAULT_ENABLE_FLUSH = true; private FileSystemConfigurations() {} } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java new file mode 100644 index 00000000000..6723d699f56 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown if there is a problem instantiating a KeyProvider or retrieving a key + * using a KeyProvider object. + */ +@InterfaceAudience.Private +public class KeyProviderException extends AzureBlobFileSystemException { + private static final long serialVersionUID = 1L; + + public KeyProviderException(String message) { + super(message); + } + + public KeyProviderException(String message, Throwable cause) { + super(message); + } + + public KeyProviderException(Throwable t) { + super(t.getMessage()); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 90e580f9a68..a89f339967c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -43,6 +43,7 @@ public enum AzureServiceErrorCode { INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), + INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), UNKNOWN(null, -1, null); private final String errorCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java index 02a7ac9b01c..903ff69e9e3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java @@ -18,7 +18,7 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonProperty; import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java index baf06dca250..32597423c86 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java @@ -20,7 +20,7 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; import java.util.List; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonProperty; import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 2b3ccc0472d..60369be9bc7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -26,12 +26,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; - +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; @@ -44,7 +45,7 @@ public class AbfsClient { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; - private final String xMsVersion = "2018-03-28"; + private final String xMsVersion = "2018-06-17"; private final ExponentialRetryPolicy retryPolicy; private final String filesystem; private final AbfsConfiguration abfsConfiguration; @@ -59,7 +60,7 @@ public class AbfsClient { this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1); this.abfsConfiguration = abfsConfiguration; this.retryPolicy = exponentialRetryPolicy; - this.userAgent = initializeUserAgent(); + this.userAgent = initializeUserAgent(abfsConfiguration); } public String getFileSystem() { @@ -137,7 +138,7 @@ public class AbfsClient { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); @@ -380,8 +381,8 @@ public class AbfsClient { return url; } - private static String urlEncode(final String value) throws AzureBlobFileSystemException { - String encodedString = null; + public static String urlEncode(final String value) throws AzureBlobFileSystemException { + String encodedString; try { encodedString = URLEncoder.encode(value, UTF_8) .replace(PLUS, PLUS_ENCODE) @@ -393,14 +394,23 @@ public class AbfsClient { return encodedString; } - private String initializeUserAgent() { + @VisibleForTesting + String initializeUserAgent(final AbfsConfiguration abfsConfiguration) { final String userAgentComment = String.format(Locale.ROOT, "(JavaJRE %s; %s %s)", System.getProperty(JAVA_VERSION), System.getProperty(OS_NAME) .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING), System.getProperty(OS_VERSION)); - + String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix(); + if (customUserAgentId != null && !customUserAgentId.isEmpty()) { + return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId); + } return String.format(CLIENT_VERSION + " %s", userAgentComment); } + + @VisibleForTesting + URL getBaseUrl() { + return baseUrl; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 53f69004d8c..2bfcff25003 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -26,10 +26,11 @@ import java.net.URL; import java.util.List; import java.util.UUID; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +168,7 @@ public class AbfsHttpOperation { */ public AbfsHttpOperation(final URL url, final String method, final List requestHeaders) throws IOException { - this.isTraceEnabled = this.LOG.isTraceEnabled(); + this.isTraceEnabled = LOG.isTraceEnabled(); this.url = url; this.method = method; this.clientRequestId = UUID.randomUUID().toString(); @@ -303,7 +304,7 @@ public class AbfsHttpOperation { } } } catch (IOException ex) { - this.LOG.error("UnexpectedError: ", ex); + LOG.error("UnexpectedError: ", ex); throw ex; } finally { if (this.isTraceEnabled) { @@ -355,7 +356,7 @@ public class AbfsHttpOperation { return; } JsonFactory jf = new JsonFactory(); - try (JsonParser jp = jf.createParser(stream)) { + try (JsonParser jp = jf.createJsonParser(stream)) { String fieldName, fieldValue; jp.nextToken(); // START_OBJECT - { jp.nextToken(); // FIELD_NAME - "error": @@ -384,7 +385,7 @@ public class AbfsHttpOperation { // Ignore errors that occur while attempting to parse the storage // error, since the response may have been handled by the HTTP driver // or for other reasons have an unexpected - this.LOG.debug("ExpectedError: ", ex); + LOG.debug("ExpectedError: ", ex); } } @@ -415,7 +416,7 @@ public class AbfsHttpOperation { final ObjectMapper objectMapper = new ObjectMapper(); this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); } catch (IOException ex) { - this.LOG.error("Unable to deserialize list results", ex); + LOG.error("Unable to deserialize list results", ex); throw ex; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 848ce8ac953..960579dfaa3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -64,7 +64,7 @@ public class AbfsInputStream extends FSInputStream { this.path = path; this.contentLength = contentLength; this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors(); + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); this.eTag = eTag; this.tolerateOobAppends = false; this.readAheadEnabled = true; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 2dbcee57f59..b69ec835d6d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -43,6 +43,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { private final String path; private long position; private boolean closed; + private boolean supportFlush; private volatile IOException lastError; private long lastFlushOffset; @@ -61,11 +62,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable { final AbfsClient client, final String path, final long position, - final int bufferSize) { + final int bufferSize, + final boolean supportFlush) { this.client = client; this.path = path; this.position = position; this.closed = false; + this.supportFlush = supportFlush; this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; @@ -162,7 +165,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void flush() throws IOException { - flushInternalAsync(); + if (supportFlush) { + flushInternalAsync(); + } } /** Similar to posix fsync, flush out the data in client's user buffer @@ -171,7 +176,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hsync() throws IOException { - flushInternal(); + if (supportFlush) { + flushInternal(); + } } /** Flush out the data in client's user buffer. After the return of @@ -180,7 +187,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hflush() throws IOException { - flushInternal(); + if (supportFlush) { + flushInternal(); + } } /** @@ -262,7 +271,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { writeOperation.task.get(); } catch (Exception ex) { if (ex.getCause() instanceof AzureBlobFileSystemException) { - ex = (AzureBlobFileSystemException)ex.getCause(); + ex = (AzureBlobFileSystemException) ex.getCause(); } lastError = new IOException(ex); throw lastError; @@ -277,8 +286,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable { if (this.lastTotalAppendOffset > this.lastFlushOffset) { this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); } - - this.lastTotalAppendOffset = 0; } private synchronized void flushWrittenBytesToServiceInternal(final long offset, @@ -304,7 +311,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { - lastError = (AzureBlobFileSystemException)e.getCause(); + lastError = (AzureBlobFileSystemException) e.getCause(); } else { lastError = new IOException(e); } @@ -322,7 +329,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { try { completionService.take(); } catch (InterruptedException e) { - lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e); + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); throw lastError; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 61263985002..6dd32fafb7a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -121,7 +121,7 @@ public class AbfsRestOperation { } } - if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) { + if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) { throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), result.getStorageErrorMessage(), null, result); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java index 36248533125..a200b406a55 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** * The UrlQueryBuilder for Rest AbfsClient. @@ -51,7 +52,12 @@ public class AbfsUriQueryBuilder { } else { sb.append(AbfsHttpConstants.AND_MARK); } - sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue()); + try { + sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(AbfsClient.urlEncode(entry.getValue())); + } + catch (AzureBlobFileSystemException ex) { + throw new IllegalArgumentException("Query string param is not encode-able: " + entry.getKey() + "=" + entry.getValue()); + } } return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java new file mode 100644 index 00000000000..27f76f8594f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; + +/** + * The interface that every Azure file system key provider must implement. + */ +public interface KeyProvider { + /** + * Key providers must implement this method. Given a list of configuration + * parameters for the specified Azure storage account, retrieve the plaintext + * storage account key. + * + * @param accountName + * the storage account name + * @param conf + * Hadoop configuration parameters + * @return the plaintext storage account key + * @throws KeyProviderException + */ + String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java new file mode 100644 index 00000000000..3fc05ff3e5e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.util.Shell; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Shell decryption key provider which invokes an external script that will + * perform the key decryption. + */ +public class ShellDecryptionKeyProvider extends SimpleKeyProvider { + private static final Logger LOG = LoggerFactory.getLogger(ShellDecryptionKeyProvider.class); + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + String envelope = super.getStorageAccountKey(accountName, conf); + + final String command = conf.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT); + if (command == null) { + throw new KeyProviderException( + "Script path is not specified via fs.azure.shellkeyprovider.script"); + } + + String[] cmd = command.split(" "); + String[] cmdWithEnvelope = Arrays.copyOf(cmd, cmd.length + 1); + cmdWithEnvelope[cmdWithEnvelope.length - 1] = envelope; + + String decryptedKey = null; + try { + decryptedKey = Shell.execCommand(cmdWithEnvelope); + } catch (IOException ex) { + throw new KeyProviderException(ex); + } + + // trim any whitespace + return decryptedKey.trim(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java new file mode 100644 index 00000000000..cedae57f1bb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.security.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Key provider that simply returns the storage account key from the + * configuration as plaintext. + */ +public class SimpleKeyProvider implements KeyProvider { + private static final Logger LOG = LoggerFactory.getLogger(SimpleKeyProvider.class); + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + String key = null; + try { + Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders( + conf, AzureBlobFileSystem.class); + char[] keyChars = c.getPassword(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); + if (keyChars != null) { + key = new String(keyChars); + } + } catch(IOException ioe) { + LOG.warn("Unable to get key from credential providers. {}", ioe); + } + return key; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 106fa09e438..b1f14856cf1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -27,10 +27,6 @@ import java.util.concurrent.Callable; import com.google.common.base.Preconditions; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; -import org.apache.hadoop.fs.azure.integration.AzureTestConstants; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; @@ -175,6 +169,17 @@ public abstract class AbstractAbfsIntegrationTest extends return abfs; } + public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception{ + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + return fs; + } + + public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception { + configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + return fs; + } + /** * Creates the filesystem; updates the {@link #abfs} field. * @return the created filesystem. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java new file mode 100644 index 00000000000..9c369bb2bf4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test continuation token which has equal sign. + */ +public final class ITestAbfsClient extends AbstractAbfsIntegrationTest { + private static final int LIST_MAX_RESULTS = 5000; + @Test + public void testContinuationTokenHavingEqualSign() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + AbfsClient abfsClient = fs.getAbfsClient(); + + try { + AbfsRestOperation op = abfsClient.listPath("/", true, LIST_MAX_RESULTS, "==========="); + Assert.assertTrue(false); + } catch (AbfsRestOperationException ex) { + Assert.assertEquals("InvalidQueryParameterValue", ex.getErrorCode().getErrorCode()); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java similarity index 90% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsReadWriteAndSeek.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index dd06fe3edfc..f62ea6e529e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs.services; +package org.apache.hadoop.fs.azurebfs; import java.util.Arrays; import java.util.Random; @@ -28,8 +28,6 @@ import org.junit.runners.Parameterized; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.AbstractAbfsScaleTest; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; @@ -72,13 +70,13 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); - try(final FSDataOutputStream stream = fs.create(TEST_PATH)) { + try (FSDataOutputStream stream = fs.create(TEST_PATH)) { stream.write(b); } final byte[] readBuffer = new byte[2 * bufferSize]; int result; - try(final FSDataInputStream inputStream = fs.open(TEST_PATH)) { + try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { inputStream.seek(bufferSize); result = inputStream.read(readBuffer, bufferSize, bufferSize); assertNotEquals(-1, result); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 057dfa03115..f1800c00c16 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -108,7 +108,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { final byte[] b = new byte[1024 * 1000]; new Random().nextBytes(b); - try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE)) { stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java index 04690de2403..522b635e9d7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java @@ -91,7 +91,7 @@ public class ITestAzureBlobFileSystemE2EScale extends final FileSystem.Statistics abfsStatistics; int testBufferSize; final byte[] sourceData; - try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE)) { abfsStatistics = fs.getFsStatistics(); abfsStatistics.reset(); @@ -112,7 +112,7 @@ public class ITestAzureBlobFileSystemE2EScale extends remoteData.length, abfsStatistics.getBytesRead()); assertEquals("bytes written in " + stats, sourceData.length, abfsStatistics.getBytesWritten()); - assertEquals("bytesRead from read() call", testBufferSize, bytesRead ); + assertEquals("bytesRead from read() call", testBufferSize, bytesRead); assertArrayEquals("round tripped data", sourceData, remoteData); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java new file mode 100644 index 00000000000..e4acbaefc61 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.lang.ref.WeakReference; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +/** + * Test finalize() method when "fs.abfs.impl.disable.cache" is enabled. + */ +public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{ + static final String DISABLE_CACHE_KEY = "fs.abfs.impl.disable.cache"; + + public ITestAzureBlobFileSystemFinalize() throws Exception { + super(); + } + + @Test + public void testFinalize() throws Exception { + // Disable the cache for filesystem to make sure there is no reference. + Configuration configuration = this.getConfiguration(); + configuration.setBoolean(this.DISABLE_CACHE_KEY, true); + + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + + WeakReference ref = new WeakReference(fs); + fs = null; + + int i = 0; + int maxTries = 1000; + while (ref.get() != null && i < maxTries) { + System.gc(); + System.runFinalization(); + i++; + } + + Assert.assertTrue("testFinalizer didn't get cleaned up within maxTries", ref.get() == null); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index d90f0186da1..2f40b6444fd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -20,12 +20,20 @@ package org.apache.hadoop.fs.azurebfs; import java.util.ArrayList; import java.util.List; +import java.util.EnumSet; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.io.IOException; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.hamcrest.core.IsEqual; +import org.hamcrest.core.IsNot; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -46,6 +54,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { private static final int THREAD_SLEEP_TIME = 6000; private static final Path TEST_FILE_PATH = new Path("/testfile"); + private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8; + private static final int WAITING_TIME = 4000; public ITestAzureBlobFileSystemFlush() { super(); @@ -55,7 +65,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); final byte[] b; - try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { b = new byte[TEST_BUFFER_SIZE]; new Random().nextBytes(b); @@ -70,7 +80,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { } final byte[] r = new byte[TEST_BUFFER_SIZE]; - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) { + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) { while (inputStream.available() != 0) { int result = inputStream.read(r); @@ -84,7 +94,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { public void testAbfsOutputStreamSyncFlush() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); final byte[] b; - try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { b = new byte[TEST_BUFFER_SIZE]; new Random().nextBytes(b); stream.write(b); @@ -97,7 +107,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { } final byte[] r = new byte[TEST_BUFFER_SIZE]; - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) { + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) { int result = inputStream.read(r); assertNotEquals(-1, result); @@ -111,7 +121,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { final AzureBlobFileSystem fs = getFileSystem(); final FileSystem.Statistics abfsStatistics; ExecutorService es; - try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { abfsStatistics = fs.getFsStatistics(); abfsStatistics.reset(); @@ -160,7 +170,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { public void testWriteHeavyBytesToFileAsyncFlush() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); ExecutorService es = Executors.newFixedThreadPool(10); - try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { final byte[] b = new byte[TEST_BUFFER_SIZE]; new Random().nextBytes(b); @@ -196,4 +206,118 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH); assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); } + + @Test + public void testFlushWithFlushEnabled() throws Exception { + AzureBlobStorageTestAccount testAccount = createWasbTestAccount(); + String wasbUrl = testAccount.getFileSystem().getName(); + String abfsUrl = wasbUrlToAbfsUrl(wasbUrl); + final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl); + byte[] buffer = getRandomBytesArray(); + CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1)); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) { + // Wait for write request to be executed + Thread.sleep(WAITING_TIME); + stream.flush(); + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, null, null, null); + // verify block has been committed + assertEquals(1, blockList.size()); + } + } + + @Test + public void testFlushWithFlushDisabled() throws Exception { + AzureBlobStorageTestAccount testAccount = createWasbTestAccount(); + String wasbUrl = testAccount.getFileSystem().getName(); + String abfsUrl = wasbUrlToAbfsUrl(wasbUrl); + final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl); + byte[] buffer = getRandomBytesArray(); + CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1)); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) { + // Wait for write request to be executed + Thread.sleep(WAITING_TIME); + stream.flush(); + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, null, null, null); + // verify block has not been committed + assertEquals(0, blockList.size()); + } + } + + @Test + public void testHflushWithFlushEnabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) { + stream.hflush(); + validate(fs, TEST_FILE_PATH, buffer, true); + } + } + + @Test + public void testHflushWithFlushDisabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) { + stream.hflush(); + validate(fs, TEST_FILE_PATH, buffer, false); + } + } + + @Test + public void testHsyncWithFlushEnabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) { + stream.hsync(); + validate(fs, TEST_FILE_PATH, buffer, true); + } + } + + @Test + public void testHsyncWithFlushDisabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) { + stream.hsync(); + validate(fs, TEST_FILE_PATH, buffer, false); + } + } + + private byte[] getRandomBytesArray() { + final byte[] b = new byte[TEST_FILE_LENGTH]; + new Random().nextBytes(b); + return b; + } + + private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException { + fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush); + FSDataOutputStream stream = fs.create(path); + stream.write(buffer); + return stream; + } + + private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + this.getConfiguration()); + } + + private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException { + String filePath = path.toUri().toString(); + try (FSDataInputStream inputStream = fs.open(path)) { + byte[] readBuffer = new byte[TEST_FILE_LENGTH]; + int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length); + if (isEqual) { + assertArrayEquals( + String.format("Bytes read do not match bytes written to %1$s", filePath), writeBuffer, readBuffer); + } else { + assertThat( + String.format("Bytes read unexpectedly match bytes written to %1$s", + filePath), + readBuffer, + IsNot.not(IsEqual.equalTo(writeBuffer))); + } + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 5a6e46db016..874a8a34c59 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -20,10 +20,10 @@ package org.apache.hadoop.fs.azurebfs; import java.io.FileNotFoundException; -import org.junit.Test; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.junit.Test; + import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 1a0edaf54e8..07426c4ab22 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -30,7 +30,6 @@ import org.junit.Test; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; @@ -86,7 +85,7 @@ public class ITestAzureBlobFileSystemRename extends assertRenameOutcome(fs, test1, new Path("testDir/test10"), true); - assertPathDoesNotExist(fs, "rename source dir", test1 ); + assertPathDoesNotExist(fs, "rename source dir", test1); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java index 1c71125ffaf..7a7e3279da6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs; import java.util.Hashtable; -import org.junit.Ignore; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -28,8 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import static org.junit.Assert.assertEquals; - /** * Test FileSystemProperties. */ @@ -62,7 +59,6 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest { } @Test - @Ignore("JDK7 doesn't support PATCH, so PUT is used. Fix is applied in latest test tenant") public void testBase64FileSystemProperties() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java similarity index 97% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsConfigurationFieldsValidation.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index ebaafa42e02..fb667ddc8a8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs.services; +package org.apache.hadoop.fs.azurebfs; import java.lang.reflect.Field; @@ -48,7 +48,7 @@ import org.junit.Test; * Test ConfigurationServiceFieldsValidation. */ public class TestAbfsConfigurationFieldsValidation { - private final AbfsConfiguration abfsConfiguration; + private AbfsConfiguration abfsConfiguration; private static final String INT_KEY= "intKey"; private static final String LONG_KEY= "longKey"; @@ -89,10 +89,12 @@ public class TestAbfsConfigurationFieldsValidation { private boolean boolField; public TestAbfsConfigurationFieldsValidation() throws Exception { + super(); Base64 base64 = new Base64(); this.encodedString = new String(base64.encode("base64Value".getBytes(Charsets.UTF_8)), Charsets.UTF_8); this.encodedAccountKey = new String(base64.encode("someAccountKey".getBytes(Charsets.UTF_8)), Charsets.UTF_8); - Configuration configuration = new Configuration(false); + Configuration configuration = new Configuration(); + configuration.addResource("azure-bfs-test.xml"); configuration.set(INT_KEY, "1234565"); configuration.set(LONG_KEY, "4194304"); configuration.set(STRING_KEY, "stringValue"); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java index d365e6e9169..c0c5f91fabc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java @@ -43,9 +43,8 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract { @Override public String getScheme() { - return isSecure ? - FileSystemUriSchemes.ABFS_SECURE_SCHEME - : FileSystemUriSchemes.ABFS_SCHEME; + return isSecure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME + : FileSystemUriSchemes.ABFS_SCHEME; } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java new file mode 100644 index 00000000000..0b335a53e06 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.URL; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +/** + * Test useragent of abfs client. + * + */ +public final class TestAbfsClient { + + @Test + public void verifyUnknownUserAgent() throws Exception { + String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)"; + final Configuration configuration = new Configuration(); + configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY); + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration); + AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null); + String userAgent = abfsClient.initializeUserAgent(abfsConfiguration); + Pattern pattern = Pattern.compile(expectedUserAgentPattern); + Assert.assertTrue(pattern.matcher(userAgent).matches()); + } + + @Test + public void verifyUserAgent() throws Exception { + String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\) Partner Service"; + final Configuration configuration = new Configuration(); + configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service"); + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration); + AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null); + String userAgent = abfsClient.initializeUserAgent(abfsConfiguration); + Pattern pattern = Pattern.compile(expectedUserAgentPattern); + Assert.assertTrue(pattern.matcher(userAgent).matches()); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestShellDecryptionKeyProvider.java new file mode 100644 index 00000000000..d17e767724c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestShellDecryptionKeyProvider.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.File; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.util.Shell; + +import static org.junit.Assert.assertEquals; + +/** + * Test ShellDecryptionKeyProvider. + * + */ +public class TestShellDecryptionKeyProvider { + public static final Log LOG = LogFactory + .getLog(TestShellDecryptionKeyProvider.class); + private static final File TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp"), "TestShellDecryptionKeyProvider"); + + @Test + public void testScriptPathNotSpecified() throws Exception { + if (!Shell.WINDOWS) { + return; + } + ShellDecryptionKeyProvider provider = new ShellDecryptionKeyProvider(); + Configuration conf = new Configuration(); + String account = "testacct"; + String key = "key"; + + conf.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + account, key); + try { + provider.getStorageAccountKey(account, conf); + Assert + .fail("fs.azure.shellkeyprovider.script is not specified, we should throw"); + } catch (KeyProviderException e) { + LOG.info("Received an expected exception: " + e.getMessage()); + } + } + + @Test + public void testValidScript() throws Exception { + if (!Shell.WINDOWS) { + return; + } + String expectedResult = "decretedKey"; + + // Create a simple script which echoes the given key plus the given + // expected result (so that we validate both script input and output) + File scriptFile = new File(TEST_ROOT_DIR, "testScript.cmd"); + FileUtils.writeStringToFile(scriptFile, "@echo %1 " + expectedResult); + + ShellDecryptionKeyProvider provider = new ShellDecryptionKeyProvider(); + Configuration conf = new Configuration(); + String account = "testacct"; + String key = "key1"; + conf.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + account, key); + conf.set(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT, + "cmd /c " + scriptFile.getAbsolutePath()); + + String result = provider.getStorageAccountKey(account, conf); + assertEquals(key + " " + expectedResult, result); + } +}